# 8.2 Streaming Data
Name: Joi Chu-Ketterer <br>  Date: 2/2/20<br>Course: DSC650 - Big Data


In [1]:
# these packages allow me to create the streams
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark import SparkConf

# this allows me to access the files on my system 
import os 

# this allows me to copy the files into a new folder
import shutil

# this allows me to make the stream/query pause before proceeding
from time import sleep

In [2]:
# this checks the local directory
os.getcwd()

'/Users/jckett/Jupyter Notebooks'

In [3]:
# this is to define the schema 
file_path = '/Users/jckett/Jupyter Notebooks/baby-names/streaming/names_00.csv'

# this is so the stream knows where to pull from
directory = '/Users/jckett/Jupyter Notebooks/baby-names/streaming'

# this is the where the files are copied to, and the count is updated
streaming_path = '/Users/jckett/Jupyter Notebooks/input_streaming'

In [4]:
# this initiates the spark session and sets the schema
spark = SparkSession.builder.appName('FemaleCount').getOrCreate()
static = spark.read.csv(file_path, header = True)
dataschema = static.schema

# checks schema structure
static.printSchema()

root
 |-- state: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- count: string (nullable = true)



In [5]:
count_test = static.groupBy('sex').count()
count_test.show()

+---+-----+
|sex|count|
+---+-----+
|  F|33417|
|  M|26583|
+---+-----+



In [6]:
# this defines what the stream reads
streaming = spark.readStream.schema(dataschema).csv(directory) 

# this defines the action to take on the files during the stream
counts = streaming.groupBy("sex").count()

# this initiates the action
streamingquery = counts.writeStream.queryName("counts").format("memory").outputMode("complete").start()

# this checks to see if the streaming has started
print(counts.isStreaming)

True


In [7]:
# this selects all the files in the directory folder
file_name = os.listdir(directory)

# this displays the count after each file is copied from the directory
# to the input_stream folder

for i in range(len(file_name[1:11])):
    
    file = file_name[i]
    
    print(file)
    src_path = os.path.join(directory,file)
    dest_path = os.path.join(streaming_path,file)
    
    shutil.copy(src_path, dest_path) 
    
    spark.sql("SELECT * FROM counts \
                WHERE SEX == 'F'").show()
    
    sleep(10)


names_36.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_22.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_23.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_37.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_09.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_21.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_35.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_34.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_20.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_08.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+



__Micro-Batching__

Instead of continuous streaming, the files will be counted in small batches, triggered every 30 seconds. 

In [8]:
# this defines where the batch files will get copied to
batch_stream = '/Users/jckett/Jupyter Notebooks/batch_streaming'

# similar to before, this defines what the stream reads
batch_sc = spark.readStream.schema(dataschema).csv(batch_stream)
batch_counts = batch_sc.groupBy("sex").count()

# this defines how the stream operates and the limitations
microquery = batch_counts.writeStream.trigger(processingTime = "30 Seconds").queryName("batch_counts").format("memory").outputMode("complete").start()

print(microquery.isActive)

True


In [9]:
# this iterates through all the files in the original folder
# copies the files one by one into the new folder and calculates 
# new counts every 30 seconds

for i in range(len(file_name[1:10])):
    
    file = file_name[i]
    
    print(file)
    src_path = os.path.join(directory,file)
    dest_path = os.path.join(batch_stream,file)
    
    shutil.copy(src_path, dest_path) 
    
    spark.sql("SELECT * FROM counts \
                WHERE SEX == 'F'").show()
    
    sleep(10)

names_36.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_22.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_23.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_37.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_09.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_21.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_35.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_34.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

names_20.csv
+---+-----+
|sex|count|
+---+-----+
+---+-----+

