In [1]:
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Week 11")\
.getOrCreate()
sc=spark.sparkContext

In [2]:
sample_data = sc.parallelize( [('A',2),('A',1),('A',3),('B',3),('B',2)])

## Group By Key

In [10]:

sample_groupByKey = sample_data.groupByKey().mapValues(sum)
sample_groupByKey.collect()

[('A', 6), ('B', 5)]


In [116]:
 sample_data.groupByKey().take(1)

[('A', <pyspark.resultiterable.ResultIterable at 0x7f1fbf198990>)]

## Reduce By Key

In [46]:
from operator import add
sample_reduceBykey= sample_data.reduceByKey(add) #try sum see what happens
sample_reduceBykey.collect()

[('A', 6), ('B', 5)]

## Combine By Key

In [47]:
sample_combineByKey= sample_data.combineByKey(lambda x:x,add,add) 
sample_combineByKey.collect()

[('A', 6), ('B', 5)]

In [48]:
def to_list(a):
    print("to list")
    print(type(a))
    return [a]

In [49]:
def append(a, b):
    print("append")
    print(type(a))
    print(type(b))
    a.append(b)
    return a

In [50]:
def extend(a, b):
    print("extend")
    print(type(a))
    print(type(b))
    a.extend(b)
    return a

In [51]:
sample_data.combineByKey(to_list, append, extend).collect()

[('A', [2, 1, 3]), ('B', [3, 2])]

## Window Functions

In [67]:
from pyspark.sql import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [61]:
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
schema = StructType([
    StructField('employee_name', StringType(), True),
    StructField('department', StringType(), True),
    StructField('salary', IntegerType(), True)
])

In [62]:

df = spark.createDataFrame(spark.sparkContext.parallelize(simpleData),schema)
df.show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|        James|     Sales|  3000|
|      Michael|     Sales|  4600|
|       Robert|     Sales|  4100|
|        Maria|   Finance|  3000|
|        James|     Sales|  3000|
|        Scott|   Finance|  3300|
|          Jen|   Finance|  3900|
|         Jeff| Marketing|  3000|
|        Kumar| Marketing|  2000|
|         Saif|     Sales|  4100|
+-------------+----------+------+



## 2. Spark Window Ranking functions

### 2.1 row_number Window Function

row_number() window function is used to give the sequential row number starting from 1 to the result of each window partition.

In [73]:
from pyspark.sql.functions import col, row_number
windowSpec  = Window.partitionBy("department").orderBy("salary") #Frame
df.withColumn("row_number",row_number().over(windowSpec)).show()

+-------------+----------+------+----------+
|employee_name|department|salary|row_number|
+-------------+----------+------+----------+
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         2|
|       Robert|     Sales|  4100|         3|
|         Saif|     Sales|  4100|         4|
|      Michael|     Sales|  4600|         5|
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
+-------------+----------+------+----------+



### 2.2 rank Window Function
rank() window function is used to provide a rank to the result within a window partition. This function leaves gaps in rank when there are ties.

In [75]:
df.withColumn("rank",rank().over(windowSpec)).show()

+-------------+----------+------+----+
|employee_name|department|salary|rank|
+-------------+----------+------+----+
|        James|     Sales|  3000|   1|
|        James|     Sales|  3000|   1|
|       Robert|     Sales|  4100|   3|
|         Saif|     Sales|  4100|   3|
|      Michael|     Sales|  4600|   5|
|        Maria|   Finance|  3000|   1|
|        Scott|   Finance|  3300|   2|
|          Jen|   Finance|  3900|   3|
|        Kumar| Marketing|  2000|   1|
|         Jeff| Marketing|  3000|   2|
+-------------+----------+------+----+



### 2.3 dense_rank Window Function
dense_rank() window function is used to get the result with rank of rows within a window partition without any gaps. This is similar to rank() function difference being rank function leaves gaps in rank when there are ties.

In [77]:
 df.withColumn("dense_rank",dense_rank().over(windowSpec))\
    .show()

+-------------+----------+------+----------+
|employee_name|department|salary|dense_rank|
+-------------+----------+------+----------+
|        James|     Sales|  3000|         1|
|        James|     Sales|  3000|         1|
|       Robert|     Sales|  4100|         2|
|         Saif|     Sales|  4100|         2|
|      Michael|     Sales|  4600|         3|
|        Maria|   Finance|  3000|         1|
|        Scott|   Finance|  3300|         2|
|          Jen|   Finance|  3900|         3|
|        Kumar| Marketing|  2000|         1|
|         Jeff| Marketing|  3000|         2|
+-------------+----------+------+----------+



## 3. Spark Window Analytic functions

### 3.1 cume_dist Window Function
cume_dist() window function is used to get the cumulative distribution of values within a window partition.

This is the same as the DENSE_RANK function in SQL.

In [78]:
 df.withColumn("cume_dist",cume_dist().over(windowSpec))\
      .show()

+-------------+----------+------+------------------+
|employee_name|department|salary|         cume_dist|
+-------------+----------+------+------------------+
|        James|     Sales|  3000|               0.4|
|        James|     Sales|  3000|               0.4|
|       Robert|     Sales|  4100|               0.8|
|         Saif|     Sales|  4100|               0.8|
|      Michael|     Sales|  4600|               1.0|
|        Maria|   Finance|  3000|0.3333333333333333|
|        Scott|   Finance|  3300|0.6666666666666666|
|          Jen|   Finance|  3900|               1.0|
|        Kumar| Marketing|  2000|               0.5|
|         Jeff| Marketing|  3000|               1.0|
+-------------+----------+------+------------------+



### 3.2 lag and lead Window Function

lag and lead can be used, when we want to get a relative result between rows. The real values we get are depending on the order.

lag means getting the value from the previous row; lead means getting the value from the next row.

In [79]:
df.withColumn("lag",lag("salary",2).over(windowSpec))\ #lag behind 2
      .show()

+-------------+----------+------+----+
|employee_name|department|salary| lag|
+-------------+----------+------+----+
|        James|     Sales|  3000|null|
|        James|     Sales|  3000|null|
|       Robert|     Sales|  4100|3000|
|         Saif|     Sales|  4100|3000|
|      Michael|     Sales|  4600|4100|
|        Maria|   Finance|  3000|null|
|        Scott|   Finance|  3300|null|
|          Jen|   Finance|  3900|3000|
|        Kumar| Marketing|  2000|null|
|         Jeff| Marketing|  3000|null|
+-------------+----------+------+----+



In [81]:
df.withColumn("lead",lead("salary",1).over(windowSpec)).show()  # lead by 1

+-------------+----------+------+----+
|employee_name|department|salary|lead|
+-------------+----------+------+----+
|        James|     Sales|  3000|3000|
|        James|     Sales|  3000|4100|
|       Robert|     Sales|  4100|4100|
|         Saif|     Sales|  4100|4600|
|      Michael|     Sales|  4600|null|
|        Maria|   Finance|  3000|3300|
|        Scott|   Finance|  3300|3900|
|          Jen|   Finance|  3900|null|
|        Kumar| Marketing|  2000|3000|
|         Jeff| Marketing|  3000|null|
+-------------+----------+------+----+



## 4. Spark Window Aggregate Functions

For aggregate functions, users can use any existing aggregate function as a window function.

In [95]:
windowSpecAgg  = Window.partitionBy("department")

In [96]:
aggDF = df.withColumn("avg", avg(col("salary")).over(windowSpecAgg))\
.withColumn("sum", sum(col("salary")).over(windowSpecAgg))\
.withColumn("min", min(col("salary")).over(windowSpecAgg))\
.withColumn("max", max(col("salary")).over(windowSpecAgg))\
.select("department","avg","sum","min","max").distinct()\
.show()

+----------+------+-----+----+----+
|department|   avg|  sum| min| max|
+----------+------+-----+----+----+
|     Sales|3760.0|18800|3000|4600|
|   Finance|3400.0|10200|3000|3900|
| Marketing|2500.0| 5000|2000|3000|
+----------+------+-----+----+----+



# Spark Streaming

In [1]:
# example from 
# https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

In [2]:
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("127.0.0.1", 8899)

In [3]:
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

In [4]:
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

In [None]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate