#Spark's Structured APIs

In [27]:
from pyspark.sql import SparkSession

# Create or open an existing Spark session
spark = SparkSession.builder \
    .appName("Spark_Session_2") \
    .getOrCreate()

#Use DataFrame API

In [25]:
from pyspark.sql.functions import avg

In [None]:
data_df = spark.createDataFrame([("Tom", 10),("Jerry", 20), ("Lays", 30)], schema = 'name string, age int')

data_df.groupBy('name').avg().show()


+-----+--------+
| name|avg(age)|
+-----+--------+
|  Tom|    10.0|
| Lays|    30.0|
|Jerry|    20.0|
+-----+--------+



# Use RDD APIs

Transformations used in the following WordCount eg: flatMap and map

I learned to be careful with the spelling. M is upper case in 'flatMap'.


#WordCount eg.

In [23]:
from pyspark.sql.functions import desc

In [28]:
# Method 1. Import a file
# fileRDD = spark.sparkContext.textFile("file://home/file_folder/inputText")

# Method 2. Use a string to do word count
fileRDD = spark.sparkContext.parallelize(["hello word count hello"])

wordsDF = fileRDD.flatMap(lambda x: x.split(" ")).map(lambda x: (x, )).toDF("word string")

# or
# from pyspark.sql.types import StringType
# wordsDF = spark.createDataFrame(fileRDD.flatMap(lambda x: x.split(" ")), stringType()).withColumnRenamed("value", "word")

countDF = wordsDF.groupBy("word").count()

# show output without sorting results in descending order
countDF.show()

# show output that is sorting results in descending order
countDF.sort(desc("count")).show()

+-----+-----+
| word|count|
+-----+-----+
|hello|    2|
|count|    1|
| word|    1|
+-----+-----+

+-----+-----+
| word|count|
+-----+-----+
|hello|    2|
|count|    1|
| word|    1|
+-----+-----+



#WordCount (RDD version) Eg.

In [20]:
from pyspark import SparkContext, SparkConf

In [19]:
# Restart kernel if run into an error message of running multiple SparkContexts at once
try:
    sc.stop()
except NameError:
    pass

In [22]:
conf = SparkConf()                                        #create configuration for Spark job
conf.setMaster("local").setAppName("wordcount_rdd")           #Run Spark locally, name your app
sc = SparkContext(conf=conf)                              #creates a SparkContext (entry point for using Spark API)

# or can do this directly
# sc = SparkContext('local', "wordcount")

# Method 1. Read from a file
text = sc.textFile("text.txt")                           #reads input file (a plan text file) into an RDD

# Method 2. Create a line of text instead of reading from a file
line = "hi lets do word count , count again and again"
text = sc.parallelize([line])                            # convert list of strings into an RDD

# split lines, make pairs, count words
count = text.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Can either show results or save it to .csv

# count.saveAsTextFile("results_rdd")                    # output results

for word, count in count.collect():
    print(word, count)

sc.stop()                                                # close Spark session

hi 1
lets 1
do 1
word 1
count 2
, 1
again 2
and 1


#WordCount (DataFrame API version) Eg

-This version is more modern and optimized

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [18]:
spark = SparkSession.builder.appName("WordCount").master("local").getOrCreate()      # Start Spark locally with an app name

# Method 1. Read from a file
#fileDF = spark.read.text("file_of_text.txt")                                        # Read text files into a DataFrame

# Method 2. Use a string to do word count
data = [("hi lets do word count hi",)]
fileDF = spark.createDataFrame(data, ["value"])                                      # Column name set as 'value' to match rest of code

wordsDF = fileDF.select(explode(split(fileDF.value, " ")).alias("word"))             # Split lines into words & flatten them
countsDF = wordsDF.groupBy("word").agg(count("*").alias("count"))                    #count how many times each word occurs

# Can either show results or save it to .csv
# countsDF.write.format("csv").save("results")                                       # Save results as CSV files

countsDF.show()

spark.stop()                                                                         # Stop Spark session

+-----+-----+
| word|count|
+-----+-----+
|count|    1|
| lets|    1|
| word|    1|
|   do|    1|
|   hi|    2|
+-----+-----+

