

## Cheatsheet

https://www.datacamp.com/cheat-sheet/pyspark-cheat-sheet-spark-dataframes-in-python

https://github.com/PacktPublishing/PySpark-Cookbook/tree/master

https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf





In [0]:
some_text = spark.createDataFrame([
    ['''
    Apache Spark achieves high performance for both batch
    and streaming data, using a state-of-the-art DAG scheduler, 
    a query optimizer, and a physical execution engine.
    ''']
    , ['''
    Apache Spark is a fast and general-purpose cluster computing 
    system. It provides high-level APIs in Java, Scala, Python 
    and R, and an optimized engine that supports general execution 
    graphs. It also supports a rich set of higher-level tools including 
    Spark SQL for SQL and structured data processing, MLlib for machine 
    learning, GraphX for graph processing, and Spark Streaming.
    ''']
    , ['''
    Machine learning is a field of computer science that often uses 
    statistical techniques to give computers the ability to "learn" 
    (i.e., progressively improve performance on a specific task) 
    with data, without being explicitly programmed.
    ''']
], ['text'])

In [0]:

splitter = feat.RegexTokenizer(
    inputCol='text'
    , outputCol='text_split'
    , pattern='\s+|[,.\"]'
)

In [0]:
splitter.transform(some_text).select('text_split').take(1)

Out[8]: [Row(text_split=['machine', 'learning', '(ml)', 'is', 'a', 'field', 'of', 'study', 'in', 'artificial', 'intelligence', 'concerned', 'with', 'the', 'development', 'and', 'study', 'of', 'statistical', 'algorithms', 'that', 'can', 'learn', 'from', 'data', 'and', 'generalize', 'to', 'unseen', 'data', 'and', 'thus', 'perform', 'tasks', 'without', 'explicit', 'instructions', '[1]', 'recently', 'generative', 'artificial', 'neural', 'networks', 'have', 'been', 'able', 'to', 'surpass', 'many', 'previous', 'approaches', 'in', 'performance'])]

In [0]:
sw_remover = feat.StopWordsRemover(
    inputCol=splitter.getOutputCol()
    , outputCol='no_stopWords'
)

In [0]:
sw_remover.transform(splitter.transform(some_text)).select('no_stopWords').take(1)

Out[10]: [Row(no_stopWords=['machine', 'learning', '(ml)', 'field', 'study', 'artificial', 'intelligence', 'concerned', 'development', 'study', 'statistical', 'algorithms', 'learn', 'data', 'generalize', 'unseen', 'data', 'thus', 'perform', 'tasks', 'without', 'explicit', 'instructions', '[1]', 'recently', 'generative', 'artificial', 'neural', 'networks', 'able', 'surpass', 'many', 'previous', 'approaches', 'performance'])]

In [0]:
hasher = feat.HashingTF(
    inputCol=sw_remover.getOutputCol()
    , outputCol='hashed'
    , numFeatures=20
)

In [0]:
hasher.transform(sw_remover.transform(splitter.transform(some_text))).select('hashed').take(1)

Out[12]: [Row(hashed=SparseVector(20, {0: 3.0, 1: 1.0, 3: 2.0, 4: 3.0, 5: 2.0, 6: 2.0, 7: 1.0, 8: 1.0, 10: 1.0, 11: 3.0, 12: 3.0, 13: 1.0, 14: 5.0, 15: 2.0, 16: 3.0, 17: 1.0, 18: 1.0}))]

In [0]:
idf = feat.IDF(
    inputCol=hasher.getOutputCol()
    , outputCol='features'
)

In [0]:
idfModel = idf.fit(hasher.transform(sw_remover.transform(splitter.transform(some_text))))

In [0]:
idfModel.transform(hasher.transform(sw_remover.transform(splitter.transform(some_text)))).select('features').take(1)

Out[15]: [Row(features=SparseVector(20, {0: 0.0, 1: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 10: 0.0, 11: 0.0, 12: 0.0, 13: 0.0, 14: 0.0, 15: 0.0, 16: 1.2164, 17: 0.0, 18: 0.4055}))]

In [0]:
from pyspark.ml import Pipeline

In [0]:
pipeline = Pipeline(stages=[splitter, sw_remover, hasher, idf])

pipelineModel = pipeline.fit(some_text)
pipelineModel.transform(some_text).select('text','features').take(1)

Out[17]: [Row(text='Machine learning (ML) is a field of study in artificial intelligence concerned with the development and study of statistical algorithms that can learn from data and generalize to unseen data, and thus perform tasks without explicit instructions.[1] Recently, generative artificial neural networks have been able to surpass many previous approaches in performance.', features=SparseVector(20, {0: 0.0, 1: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 10: 0.0, 11: 0.0, 12: 0.0, 13: 0.0, 14: 0.0, 15: 0.0, 16: 1.2164, 17: 0.0, 18: 0.4055}))]

In [0]:
w2v = feat.Word2Vec(
    vectorSize=5
    , minCount=2
    , inputCol=sw_remover.getOutputCol()
    , outputCol='vector'
)

In [0]:
model=w2v.fit(sw_remover.transform(splitter.transform(some_text)))
model.transform(sw_remover.transform(splitter.transform(some_text))).select('vector').take(1)

Out[21]: [Row(vector=DenseVector([0.0033, -0.001, 0.0013, -0.0091, -0.0094]))]