<a href="https://colab.research.google.com/github/masfworld/sparkstreaming_datahack_webinar/blob/master/SparkStreaming_Datahack.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Spark Streaming


<a href="https://datahack.es">
  <img src="https://www.datahack.es/wp-content/uploads/2019/10/Todo_vertical.png" Big Data Spain"" width="10%">
  
  
**Author**:
miguel.sotomayor@sidesna.es\
https://www.linkedin.com/in/miguelsotomayorf/ \
https://github.com/masfworld





# Prerrequisites

Installing Spark and Apache Kafka Library in VM


---



In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install py4j

# For maps
!pip install folium
!pip install plotly

In [0]:
!wget -q https://github.com/masfworld/datahack_docker/raw/master/zeppelin/libs/libs-kafka.zip --directory-prefix=/content/spark-2.4.5-bin-hadoop2.7/jars/
!unzip -n /content/spark-2.4.5-bin-hadoop2.7/jars/libs-kafka.zip -d /content/spark-2.4.5-bin-hadoop2.7/jars/
!ls /content/spark-2.4.5-bin-hadoop2.7/jars/*kafka*

Define the environment (Java & Spark homes)

---

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
os.environ["PYSPARK_SUBMIT_ARGS"] = ""

Starting Spark Session and print the version


---


In [0]:
import findspark
findspark.add_packages(["org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5"])
findspark.add_jars(["/content/spark-2.4.5-bin-hadoop2.7/jars/kafka-clients-2.0.0.jar","/content/spark-2.4.5-bin-hadoop2.7/jars/lz4-java-1.4.1-jar","/content/spark-2.4.5-bin-hadoop2.7/jars/scala-library-2.11.12.jar","/content/spark-2.4.5-bin-hadoop2.7/jars/slf4j-api-1.7.25.jar","/content/spark-2.4.5-bin-hadoop2.7/jars/snappy-java-1.1.7.1.jar","/content/spark-2.4.5-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.11-2.4.5.jar","/content/spark-2.4.5-bin-hadoop2.7/jars/spark-tags_2.11-2.4.5.jar","/content/spark-2.4.5-bin-hadoop2.7/jars/unused-1.0.0.jar"])
findspark.init("spark-2.4.5-bin-hadoop2.7")# SPARK_HOME

from pyspark.sql import SparkSession

# create the session
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .config("spark.ui.port", "4050") \
        .getOrCreate()

spark.version

In [0]:
spark

In [0]:
# For Pandas conversion optimization
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

Creating ngrok tunnel to allow Spark UI (Optional)


In [0]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

# Streaming from a directory

In [0]:
!head /content/sample_data/california_housing_train.csv

In [0]:
!mkdir -p /content/california_housing
!cp /content/sample_data/california_housing_train.csv /content/california_housing/
!ls /content/california_housing/

In [0]:
from pyspark.sql.types import StructType

# Read all the csv files written atomically in a directory
housingSchema = StructType()\
.add("longitude", "double")\
.add("latitude", "double")\
.add("housing_median_age", "double")\
.add("total_rooms", "double")\
.add("total_bedrooms", "double")\
.add("population", "double")\
.add("households", "double")\
.add("median_income", "double")\
.add("median_house_value", "double")

housing_df = spark \
    .readStream \
    .option("sep", ",") \
    .option("header", "true") \
    .schema(housingSchema) \
    .csv("/content/california_housing/")  


In [0]:
from pyspark.sql.functions import col, avg

housing_avg = housing_df.agg(avg(col('total_bedrooms')))

In [0]:
housing_avg_query = housing_avg \
                      .writeStream \
                      .outputMode("complete") \
                      .format("memory") \
                      .queryName("housing_avg")

housing_avg_query.start()

In [0]:
spark.sql("select * from housing_avg").show()

In [0]:
!cp /content/sample_data/california_housing_test.csv /content/california_housing/

In [0]:
spark.sql("select * from housing_avg").show()

# Prerrequisites for Spark-Kafka

Defining Kafka schema

---



In [0]:
from pyspark.sql.types import *

schema = StructType([StructField('id', StringType(), True), \
                     StructField('created_date', StringType(), True), \
                     StructField('closed_date', StringType(), True), \
                     StructField('agency', StringType(), True), \
                     StructField('agency_name', StringType(), True), \
                     StructField('complaint_type', StringType(), True), \
                     StructField('descriptor', StringType(), True), \
                     StructField('location_type', StringType(), True), \
                     StructField('incident_zip', StringType(), True), \
                     StructField('incident_address', StringType(), True), \
                     StructField('street_name', StringType(), True), \
                     StructField('cross_street_1', StringType(), True), \
                     StructField('cross_street_2', StringType(), True), \
                     StructField('intersection_street1', StringType(), True), \
                     StructField('intersection_street2', StringType(), True), \
                     StructField('address_type', StringType(), True), \
                     StructField('city', StringType(), True), \
                     StructField('landmark', StringType(), True), \
                     StructField('facility_type', StringType(), True), \
                     StructField('status', StringType(), True), \
                     StructField('due_date', StringType(), True), \
                     StructField('resolution_description', StringType(), True), \
                     StructField('resolution_action_update_date', StringType(), True), \
                     StructField('community_board', StringType(), True), \
                     StructField('bbl', StringType(), True), \
                     StructField('borough', StringType(), True), \
                     StructField('x_coordinate_state_plane', StringType(), True), \
                     StructField('y_coordinate_state_plane', StringType(), True), \
                     StructField('open_data_channel_type', StringType(), True), \
                     StructField('park_facility_name', StringType(), True), \
                     StructField('park_borough', StringType(), True), \
                     StructField('vehicle_type', StringType(), True), \
                     StructField('taxi_company_borough', StringType(), True), \
                     StructField('taxi_pick_up_location', StringType(), True), \
                     StructField('bridge_highway_name', StringType(), True), \
                     StructField('bridge_highway_direction', StringType(), True), \
                     StructField('road_ramp', StringType(), True), \
                     StructField('bridge_highway_segment', StringType(), True), \
                     StructField('latitude', StringType(), True), \
                     StructField('longitude', StringType(), True), \
                     StructField('location', StringType(), True)])

# Streaming From Kafka

Connecting with Kafka

---



In [0]:
kafka_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ec2-34-229-22-148.compute-1.amazonaws.com:9092") \
  .option("subscribe", "calls") \
  .load()

In [0]:
kafka_df.printSchema

In [0]:
from pyspark.sql.functions import from_json, col

dataset_calls = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
.withColumn("value", from_json("value", schema)) \
.select(col("timestamp"), col('value.*'))

Adding output to memory. **Just for development!!!!**

In [0]:
dataset_calls \
  .writeStream \
  .outputMode("append") \
  .format("memory") \
  .queryName("test_kafka_datahack") \
  .start()

In [0]:
df = spark.sql("select * from test_kafka_datahack")

First query in real time or NRT

In [0]:
spark.sql("select * from test_kafka_datahack limit 5").show()

Total by complaint type


---



In [0]:
from pyspark.sql.functions import desc

df_all = spark.sql("select * from test_kafka_datahack")

df_complaint = df_all \
  .groupBy("complaint_type") \
  .count() \
  .orderBy(desc("count"))

df_complaint.show(truncate=False)

Windowing


---



In [0]:
from pyspark.sql.functions import window

windowedCounts = df_all.groupBy(
    window(df_all.timestamp, "5 minutes"),
    df_all.location_type
).count()

windowedCounts.orderBy(desc("count")).show(truncate=False)

# Multiple plots

Pie by complaint type

---

In [0]:
import plotly.express as px

fig = px.pie(df_complaint.toPandas(), values='count', names='complaint_type', title='Calls per Complaint type')
fig.show()

Pie per borough


---



In [0]:
pie_chart_pd = df_all.groupBy("borough").count().toPandas()

fig = px.pie(pie_chart_pd, values='count', names='borough', title='Calls per borough')
fig.show()

Line chart per date and borough


---



In [0]:
from pyspark.sql.functions import from_unixtime, date_format, unix_timestamp

df_hour_created = df_all.withColumn('hour_created', 
    date_format(from_unixtime(unix_timestamp('created_date', 'MM/dd/yyyy hh:mm:ss aa')), "YYYY-MM-dd")
    )

df_hour_group = df_hour_created \
                  .groupBy("hour_created", "borough") \
                  .count() \
                  .orderBy("hour_created")

df_hour_group \
      .orderBy(desc("count")) \
      .show()

In [0]:
line_chart_pd = df_hour_group.toPandas()

fig = px.line(line_chart_pd, x="hour_created", y="count", color='borough')
fig.show()

...With Windowing

In [0]:
windowedCounts_borough = df_all.groupBy(
    window(df_all.timestamp, "10 minutes"),
    df_all.borough
).count()

windowedCounts_borough.orderBy(desc("count")).show(truncate=False)

In [0]:
line_chart_window_pd = windowedCounts_borough \
                                    .select(col("window.start").alias("start"), col("borough"), col("count")) \
                                    .orderBy("start") \
                                    .toPandas()

fig_borough_window = px.line(line_chart_window_pd, x="start", y="count", color='borough')
fig_borough_window.show()

# Maps

In [0]:
postitions_pd = df_all.select(col("id"), col("latitude").cast('float'), col("longitude").cast('float')).toPandas()

In [0]:
import plotly.graph_objects as go

mapbox_access_token = "pk.eyJ1IjoibWFzZndvcmxkIiwiYSI6ImNrOXByaXVjajBkOGEzZm1tem9sZjFmaTcifQ.8aLQQQLpbayR00hZFrD3VA"

fig = go.Figure(go.Scattermapbox(
        lat=postitions_pd['latitude'],
        lon=postitions_pd['longitude'],
        mode='markers',
        marker=go.scattermapbox.Marker(
            size=9
        )
    ))

fig.update_layout(
    autosize=True,
    hovermode='closest',
    mapbox=dict(
        accesstoken=mapbox_access_token,
        bearing=0,
        center=dict(
            lat=40.77,
            lon=-73.9710
        ),
        pitch=0,
        zoom=10
    ),
)

fig.show()

Another Map


---



In [0]:
postitions_pd_new = df_all.select(col("id"), col("latitude").cast('float'), col("longitude").cast('float')).toPandas()

In [0]:
import folium
from folium.plugins import MarkerCluster

subset_of_df = postitions_pd_new.sample(n=1000, replace=True)

some_map = folium.Map(location=[subset_of_df['latitude'].mean(), subset_of_df['longitude'].mean()],  zoom_start=10)
mc = MarkerCluster()
#creating a Marker for each point in df_sample. Each point will get a popup with their zip
for row in subset_of_df.itertuples():
    mc.add_child(folium.Marker(location=[row.latitude,  row.longitude],
                 popup=row.id))
 
some_map.add_child(mc)
some_map