## Initialize a Spark Session

In [21]:
import findspark
findspark.init()
import pyspark
# Creating a SparkSession in Python
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local")\
          .appName("Spark Streaming Demonstration")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()
# keep the size of shuffles small
spark.conf.set("spark.sql.shuffle.partitions", "2") 

## Task 1: Discover a method to simulate a stream by utilizing data sourced from files


1. Define input path

In [29]:
# input path in local filesystem
inputPath = "./data"

2. Get schema from input files

In [30]:
staticInputDF = (
  spark
    .read
    .csv(inputPath)
)

schema = staticInputDF.schema

In [31]:
schema

StructType([StructField('_c0', StringType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True), StructField('_c3', StringType(), True), StructField('_c4', StringType(), True), StructField('_c5', StringType(), True), StructField('_c6', StringType(), True), StructField('_c7', StringType(), True), StructField('_c8', StringType(), True), StructField('_c9', StringType(), True), StructField('_c10', StringType(), True), StructField('_c11', StringType(), True), StructField('_c12', StringType(), True), StructField('_c13', StringType(), True), StructField('_c14', StringType(), True), StructField('_c15', StringType(), True), StructField('_c16', StringType(), True), StructField('_c17', StringType(), True), StructField('_c18', StringType(), True), StructField('_c19', StringType(), True)])

3. Tạo `streaming_df` và lấy ra các cột `Action` và `Time`

In [8]:
import pyspark.sql.functions as f

streaming_df = (
  spark
    .readStream
    .schema(schema)          # Set the schema of the csv data
    .option("maxFilesPerTrigger", 1)
    .csv(inputPath)
)

# streaming_df = streaming_df.select(f.col('_c0').alias('Action'), f.col('_c3').alias('Time'))

4. Create `streaming_df_count` for counting trips by drop-off datetime 

In [6]:
from pyspark.sql.functions import *      # for window() function
streaming_df = streaming_df.select(f.col('_c0').alias('Action'), f.col('_c3').alias('Time'))

streaming_df_count = (
  streaming_df
    .groupBy(window(streaming_df.Time, "1 hour"))    
    .count()
)

print('is process Counting streaming?', streaming_df_count.isStreaming)

is process Counting streaming? True


 ## Task 2: Create query that aggregates the number of trips by dropoff datetime for each hour.

In [10]:
# This query stores the aggregation results in memory then visualize it
query = (
  streaming_df_count
    .writeStream
    .format("memory")         # console or memory(= store in-memory table) 
    .queryName("counts")      # counts = name of the in-memory table
    .outputMode("complete")   
    .option("truncate", "false")
    .start()
)
query.awaitTermination(60)

query.stop()

result = spark.sql('select * from counts order by window')

result.show(result.count(), truncate=False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2015-12-01 00:00:00, 2015-12-01 01:00:00}|7396 |
|{2015-12-01 01:00:00, 2015-12-01 02:00:00}|5780 |
|{2015-12-01 02:00:00, 2015-12-01 03:00:00}|3605 |
|{2015-12-01 03:00:00, 2015-12-01 04:00:00}|2426 |
|{2015-12-01 04:00:00, 2015-12-01 05:00:00}|2505 |
|{2015-12-01 05:00:00, 2015-12-01 06:00:00}|3858 |
|{2015-12-01 06:00:00, 2015-12-01 07:00:00}|10258|
|{2015-12-01 07:00:00, 2015-12-01 08:00:00}|19007|
|{2015-12-01 08:00:00, 2015-12-01 09:00:00}|23799|
|{2015-12-01 09:00:00, 2015-12-01 10:00:00}|24003|
|{2015-12-01 10:00:00, 2015-12-01 11:00:00}|21179|
|{2015-12-01 11:00:00, 2015-12-01 12:00:00}|20219|
|{2015-12-01 12:00:00, 2015-12-01 13:00:00}|20522|
|{2015-12-01 13:00:00, 2015-12-01 14:00:00}|20556|
|{2015-12-01 14:00:00, 2015-12-01 15:00:00}|21712|
|{2015-12-01 15:00:00, 2015-12-01 16:00:00}|22016|
|{2015-12-01 16:00:00, 2015-12-

6. Create folders and files to store each intervals 

In [11]:
import os

count = 360000
for row in result.collect():
#     print(row)
    newpath = r'./output-' + str(count) 
    if not os.path.exists(newpath):
        os.makedirs(newpath)
    with open(newpath + f"/output-{count}.txt", "w") as file:
        file.write(str(row['count']))
    count += 360000

## Task 3: Create a query that counts the number of taxi trips each hour that drop off at either the Goldman Sachs headquarters or the Citigroup headquarters

In [151]:
import pyspark.sql.functions as f
import numpy as np 
from pyspark.sql.types import StructType,StructField, StringType


def filter_location(df, location):
    goldman = np.array([[-74.0141012, 40.7152191], [-74.013777, 40.7152275], [-74.0141027, 40.7138745], [-74.0144185, 40.7140753]])
    citigroup = np.array([[-74.011869, 40.7217236], [-74.009867, 40.721493], [-74.010140,40.720053], [-74.012083, 40.720267]])
    locations = [goldman, citigroup]
    
    schema = StructType([
      StructField('dropoff_longitude', StringType(), True),
      StructField('dropoff_latitude', StringType(), True),
      StructField('Time', StringType(), True),
      ])
    
    res = spark.createDataFrame([], schema)
    
    d = {'goldman': 0, 'citigroup': 0}
    index = 0
    
    x_min = np.min(goldman[:, 0])
    x_max = np.max(goldman[:, 0])
    y_min = np.min(goldman[:, 1])
    y_max = np.max(goldman[:, 1])

    x1_min = np.min(citigroup[:, 0])
    x1_max = np.max(citigroup[:, 0])
    y1_min = np.min(citigroup[:, 1])
    y1_max = np.max(citigroup[:, 1])
#         print(x_min, x_max, y_min, y_max)
    if location == 'goldman':
        filter_df = df.filter((((f.col('dropoff_latitude') <= y_max) & (f.col('dropoff_latitude') >= y_min)) &\
                ((f.col('dropoff_longitude') <= x_max) & (f.col('dropoff_longitude') >= x_min)))) 
                              
    else:
        filter_df = df.filter((((f.col('dropoff_latitude') <= y1_max) & (f.col('dropoff_latitude') >= y1_min)) &\
                ((f.col('dropoff_longitude') <= x1_max) & (f.col('dropoff_longitude') >= x1_min))) 
        )
        
    return filter_df.withColumn('Action', f.lit(location))

1. Get the streaming data 

In [152]:
import pyspark.sql.functions as f

streaming_df = (
  spark
    .readStream
    .schema(schema)          # Set the schema of the csv data
#     .option("maxFilesPerTrigger", 1)
    .csv(inputPath)
)

2. Filter the record to get `longitude` and `latitude`, then filter using `location` (goldman, citigroup)

In [153]:
yellow_records_df = streaming_df.filter(f.col('_c0') == 'yellow')
yellow_records_df = yellow_records_df.select(f.col('_c10').alias('dropoff_longitude'), f.col('_c11').alias('dropoff_latitude'), f.col('_c3').alias('Time'))

green_records_df = streaming_df.filter(f.col('_c0') == 'green')
green_records_df = green_records_df.select(f.col('_c8').alias('dropoff_longitude'), f.col('_c9').alias('dropoff_latitude'), f.col('_c3').alias('Time'))

df = yellow_records_df.union(green_records_df)

df = filter_location(df, 'goldman').union(filter_location(df, 'citigroup'))

3. Create query

In [154]:
streamingCount = (                 
    df
    .groupBy( 
      df.Action,
      window(df.Time, "1 hour"))
    .count()
)

# This query stores the aggregation results in memory then visualize it
query = (
  streamingCount
    .writeStream
    .format("memory")         # console or memory(= store in-memory table) 
    .queryName("counts")      # counts = name of the in-memory table
    .outputMode("complete")   
    .option("truncate", "false")
    .start()
)
query.awaitTermination(60)

query.stop()

result = spark.sql('select * from counts order by window')

# result.show(result.count(), truncate=False)

In [155]:
goldman_res = result.filter(f.col('Action') == 'goldman')
goldman_res.show(truncate=False)

+-------+------------------------------------------+-----+
|Action |window                                    |count|
+-------+------------------------------------------+-----+
|goldman|{2015-12-01 05:00:00, 2015-12-01 06:00:00}|8    |
|goldman|{2015-12-01 06:00:00, 2015-12-01 07:00:00}|28   |
|goldman|{2015-12-01 07:00:00, 2015-12-01 08:00:00}|44   |
|goldman|{2015-12-01 08:00:00, 2015-12-01 09:00:00}|59   |
|goldman|{2015-12-01 09:00:00, 2015-12-01 10:00:00}|72   |
|goldman|{2015-12-01 10:00:00, 2015-12-01 11:00:00}|58   |
|goldman|{2015-12-01 11:00:00, 2015-12-01 12:00:00}|34   |
|goldman|{2015-12-01 12:00:00, 2015-12-01 13:00:00}|26   |
|goldman|{2015-12-01 13:00:00, 2015-12-01 14:00:00}|19   |
|goldman|{2015-12-01 14:00:00, 2015-12-01 15:00:00}|31   |
|goldman|{2015-12-01 15:00:00, 2015-12-01 16:00:00}|22   |
|goldman|{2015-12-01 16:00:00, 2015-12-01 17:00:00}|12   |
|goldman|{2015-12-01 17:00:00, 2015-12-01 18:00:00}|3    |
|goldman|{2015-12-01 18:00:00, 2015-12-01 19:00:00}|5   

In [156]:
citigroup_res = result.filter(f.col('Action') == 'citigroup')
citigroup_res.show(truncate=False)

+---------+------------------------------------------+-----+
|Action   |window                                    |count|
+---------+------------------------------------------+-----+
|citigroup|{2015-12-01 00:00:00, 2015-12-01 01:00:00}|6    |
|citigroup|{2015-12-01 01:00:00, 2015-12-01 02:00:00}|2    |
|citigroup|{2015-12-01 02:00:00, 2015-12-01 03:00:00}|2    |
|citigroup|{2015-12-01 03:00:00, 2015-12-01 04:00:00}|2    |
|citigroup|{2015-12-01 04:00:00, 2015-12-01 05:00:00}|1    |
|citigroup|{2015-12-01 05:00:00, 2015-12-01 06:00:00}|11   |
|citigroup|{2015-12-01 06:00:00, 2015-12-01 07:00:00}|70   |
|citigroup|{2015-12-01 07:00:00, 2015-12-01 08:00:00}|95   |
|citigroup|{2015-12-01 08:00:00, 2015-12-01 09:00:00}|76   |
|citigroup|{2015-12-01 09:00:00, 2015-12-01 10:00:00}|75   |
|citigroup|{2015-12-01 10:00:00, 2015-12-01 11:00:00}|33   |
|citigroup|{2015-12-01 11:00:00, 2015-12-01 12:00:00}|27   |
|citigroup|{2015-12-01 12:00:00, 2015-12-01 13:00:00}|35   |
|citigroup|{2015-12-01 1

4. Create folders, and a file to write output

In [162]:
import os

count = 360000
for row, row1 in zip(goldman_res.collect(), citigroup_res.collect()):
#     print(row, row1)

    newpath = r'./output-' + str(count) 
    if not os.path.exists(newpath):
        os.makedirs(newpath)
    with open(newpath + f"/output-{count}.txt", "w") as file:
        file.write(f"goldman: {str(row['count'])}\ncitigroup: {row1['count']}")
    count += 360000