# Setup

In [1]:
import os
import time
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col

# For use in Chapter 9 - Data Sources
# https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
packages = "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

def logLevel(spark):
    # REF: https://stackoverflow.com/questions/25193488/how-to-turn-off-info-logging-in-spark
    sc = spark.sparkContext
    log4jLogger = sc._jvm.org.apache.log4j
    log4jLogger.Logger.getLogger("org").setLevel(log4jLogger.Level.ERROR)
    log = log4jLogger.LogManager.getLogger(__name__)
    log.warn("Custom Warning")


spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Demo") \
    .getOrCreate()


logLevel(spark)

spark.sparkContext.setLogLevel("ERROR")

In [4]:
import pyspark.sql.functions as psf


def parseJSONCols(df, *cols, sanitize=True):
    """Auto infer the schema of a json column and parse into a struct.

    rdd-based schema inference works if you have well-formatted JSON,
    like ``{"key": "value", ...}``, but breaks if your 'JSON' is just a
    string (``"data"``) or is an array (``[1, 2, 3]``). In those cases you
    can fix everything by wrapping the data in another JSON object
    (``{"key": [1, 2, 3]}``). The ``sanitize`` option (default True)
    automatically performs the wrapping and unwrapping.

    The schema inference is based on this
    `SO Post <https://stackoverflow.com/a/45880574)/>`_.

    Parameters
    ----------
    df : pyspark dataframe
        Dataframe containing the JSON cols.
    *cols : string(s)
        Names of the columns containing JSON.
    sanitize : boolean
        Flag indicating whether you'd like to sanitize your records
        by wrapping and unwrapping them in another JSON object layer.

    Returns
    -------
    pyspark dataframe
        A dataframe with the decoded columns.
    """
    res = df
    for i in cols:
        # sanitize if requested.
        if sanitize:
            res = (
                res.withColumn(
                    i,
                    psf.concat(psf.lit('{"data": '), i, psf.lit('}'))
                )
            )
        # infer schema and apply it
        schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
        res = res.withColumn(i, psf.from_json(psf.col(i), schema))

        # unpack the wrapped object if needed
        if sanitize:
            res = res.withColumn(i, psf.col(i).data)
    return res

# Read stream of data

In [5]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("startingOffsets", "latest") \
    .option("subscribe", "twitter_tweets") \
    .load() \
    .selectExpr("CAST(value AS STRING) as tweets")

In [6]:
df.printSchema()

root
 |-- tweets: string (nullable = true)



In [8]:
tweets = df.writeStream \
                    .format("memory") \
                    .queryName("tweeters") \
                    .outputMode("update") \
                    .start()

In [9]:
df = spark.sql(""" SELECT * FROM tweeters """)

In [16]:
df.show()

+--------------------+
|              tweets|
+--------------------+
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
|{"created_at":"Mo...|
+--------------------+
only showing top 20 rows



In [17]:
ddf = parseJSONCols(df, 'tweets', sanitize=True)
ddf.printSchema()

StructType(List(StructField(tweets,StructType(List(StructField(contributors,StringType,true),StructField(coordinates,StructType(List(StructField(coordinates,ArrayType(DoubleType,true),true),StructField(type,StringType,true))),true),StructField(created_at,StringType,true),StructField(display_text_range,ArrayType(LongType,true),true),StructField(entities,StructType(List(StructField(hashtags,ArrayType(StructType(List(StructField(indices,ArrayType(LongType,true),true),StructField(text,StringType,true))),true),true),StructField(media,ArrayType(StructType(List(StructField(additional_media_info,StructType(List(StructField(monetizable,BooleanType,true))),true),StructField(display_url,StringType,true),StructField(expanded_url,StringType,true),StructField(id,LongType,true),StructField(id_str,StringType,true),StructField(indices,ArrayType(LongType,true),true),StructField(media_url,StringType,true),StructField(media_url_https,StringType,true),StructField(sizes,StructType(List(StructField(large,Str

In [19]:
tweets.isActive

True

In [20]:
ddf.show()

+--------------------+
|              tweets|
+--------------------+
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
|[,, Mon Jun 01 14...|
+--------------------+
only showing top 20 rows



In [77]:
ddf.select("tweets.created_at", "tweets.coordinates.coordinates", "tweets.user.followers_count").show(truncate=False)

+------------------------------+-----------+---------------+
|created_at                    |coordinates|followers_count|
+------------------------------+-----------+---------------+
|Mon Jun 01 14:57:24 +0000 2020|null       |2293           |
|Mon Jun 01 14:57:24 +0000 2020|null       |37             |
|Mon Jun 01 14:57:24 +0000 2020|null       |1257           |
|Mon Jun 01 14:57:25 +0000 2020|null       |61             |
|Mon Jun 01 14:57:25 +0000 2020|null       |53             |
|Mon Jun 01 14:57:25 +0000 2020|null       |14             |
|Mon Jun 01 14:57:25 +0000 2020|null       |110            |
|Mon Jun 01 14:57:25 +0000 2020|null       |78             |
|Mon Jun 01 14:57:25 +0000 2020|null       |80             |
|Mon Jun 01 14:57:26 +0000 2020|null       |318            |
|Mon Jun 01 14:57:26 +0000 2020|null       |25             |
|Mon Jun 01 14:57:27 +0000 2020|null       |28             |
|Mon Jun 01 14:57:27 +0000 2020|null       |1249           |
|Mon Jun 01 14:57:27 +00

In [78]:
ddf.select("tweets.created_at", "tweets.coordinates.coordinates", "tweets.user.followers_count", "tweets.quoted_status.text").show(truncate=False)

+------------------------------+-----------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|created_at                    |coordinates|followers_count|text                                                                                                                                        |
+------------------------------+-----------+---------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|Mon Jun 01 14:57:24 +0000 2020|null       |2293           |null                                                                                                                                        |
|Mon Jun 01 14:57:24 +0000 2020|null       |37             |null                                                                                                                                

In [45]:
ddf.count()

3714

In [68]:
ddf.select("tweets.created_at", "tweets.coordinates.coordinates", "tweets.user.followers_count", "tweets.extended_tweet.full_text") \
                .where(col("tweets.coordinates").isNotNull()) \
                .show(truncate=False)

+------------------------------+---------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|created_at                    |coordinates                |followers_count|full_text                                                                                                                                                                                                                                            |
+------------------------------+---------------------------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Mon Jun 01 14:57:56 +0000 2020

## Setup `pandasUDF`

In [85]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import *

# Declare the function and create the UDF
def multiply_func(a):
    return a * 2

multiply = pandas_udf(multiply_func, returnType=LongType())

@pandas_udf(ArrayType(FloatType()))
def lat(v):
    return v.apply(lambda x: x[1:])

@pandas_udf(ArrayType(FloatType()))
def long(v):
    return v.apply(lambda x: x[:1])

In [83]:
dd = ddf.select("tweets.created_at", "tweets.coordinates.coordinates",  
                "tweets.user.followers_count", "tweets.extended_tweet.full_text") \
                .where(col("tweets.coordinates").isNotNull()) \
                .where(col("tweets.extended_tweet.full_text").isNotNull())

In [84]:
dd.show()

+--------------------+--------------------+---------------+--------------------+
|          created_at|         coordinates|followers_count|           full_text|
+--------------------+--------------------+---------------+--------------------+
|Mon Jun 01 15:03:...|  [-0.1094, 51.5141]|            880|Nuevue Cases with...|
|Mon Jun 01 15:03:...|  [-0.1094, 51.5141]|           1012|We say no to hatr...|
|Mon Jun 01 15:05:...|[-0.22166667, 51....|              3|🅵🆄🅻🅷🅰🅼 - 🅲...|
|Mon Jun 01 15:05:...|[-0.09838343, 51....|              3|🆂🅺🆈🆂🅲🆁🅰🅿?...|
|Mon Jun 01 15:09:...|[-0.19756, 51.36119]|            404|Make the most of ...|
|Mon Jun 01 15:11:...|[-0.34247548, 51....|           9498|#FindBird has whi...|
|Mon Jun 01 15:11:...|    [-3.68333, 40.4]|           5665|#bellamontiell #t...|
|Mon Jun 01 15:12:...|    [-3.68333, 40.4]|           5667|#bellamontiell #t...|
|Mon Jun 01 15:13:...|    [-3.68333, 40.4]|           5667|#bellamontiell #t...|
|Mon Jun 01 15:15:...|[-0.12840271, 51....|

In [86]:
dd.withColumn("follower_count_times_two", multiply(col("followers_count"))).show()

+--------------------+--------------------+---------------+--------------------+------------------------+
|          created_at|         coordinates|followers_count|           full_text|follower_count_times_two|
+--------------------+--------------------+---------------+--------------------+------------------------+
|Mon Jun 01 15:03:...|  [-0.1094, 51.5141]|            880|Nuevue Cases with...|                    1760|
|Mon Jun 01 15:03:...|  [-0.1094, 51.5141]|           1012|We say no to hatr...|                    2024|
|Mon Jun 01 15:05:...|[-0.22166667, 51....|              3|🅵🆄🅻🅷🅰🅼 - 🅲...|                       6|
|Mon Jun 01 15:05:...|[-0.09838343, 51....|              3|🆂🅺🆈🆂🅲🆁🅰🅿?...|                       6|
|Mon Jun 01 15:09:...|[-0.19756, 51.36119]|            404|Make the most of ...|                     808|
|Mon Jun 01 15:11:...|[-0.34247548, 51....|           9498|#FindBird has whi...|                   18996|
|Mon Jun 01 15:11:...|    [-3.68333, 40.4]|           5665|#b

In [87]:
ldd = dd.withColumn("latitude", lat(col("coordinates")))
lldd = ldd.withColumn("longditude", long(col("coordinates")))
lldd.show()

+--------------------+--------------------+---------------+--------------------+-----------+-------------+
|          created_at|         coordinates|followers_count|           full_text|   latitude|   longditude|
+--------------------+--------------------+---------------+--------------------+-----------+-------------+
|Mon Jun 01 15:03:...|  [-0.1094, 51.5141]|            880|Nuevue Cases with...|  [51.5141]|    [-0.1094]|
|Mon Jun 01 15:03:...|  [-0.1094, 51.5141]|           1012|We say no to hatr...|  [51.5141]|    [-0.1094]|
|Mon Jun 01 15:05:...|[-0.22166667, 51....|              3|🅵🆄🅻🅷🅰🅼 - 🅲...|   [51.475]|[-0.22166666]|
|Mon Jun 01 15:05:...|[-0.09838343, 51....|              3|🆂🅺🆈🆂🅲🆁🅰🅿?...|[51.510147]|[-0.09838343]|
|Mon Jun 01 15:09:...|[-0.19756, 51.36119]|            404|Make the most of ...| [51.36119]|   [-0.19756]|
|Mon Jun 01 15:11:...|[-0.34247548, 51....|           9498|#FindBird has whi...| [51.45603]|[-0.34247547]|
|Mon Jun 01 15:11:...|    [-3.68333, 40.4]|         

In [88]:
lldd.count()

11

# Convert to pandas for plotly

In [89]:
lldd_pdf = lldd.toPandas()

In [90]:
lldd_pdf['latitude'].tolist()[0].item()

51.51409912109375

In [91]:
import numpy as np

In [92]:
lati_series = lldd_pdf['latitude'].transform(lambda x: x.item())

In [93]:
longd_series = lldd_pdf['longditude'].transform(lambda x: x.item())

In [94]:
lldd_pdf['llat'] = lldd_pdf['latitude'].transform(lambda x: x.item())

In [95]:
lldd_pdf['llong'] = lldd_pdf['longditude'].transform(lambda x: x.item())

In [96]:
lldd_pdf.head()

Unnamed: 0,created_at,coordinates,followers_count,full_text,latitude,longditude,llat,llong
0,Mon Jun 01 15:03:41 +0000 2020,"[-0.1094, 51.5141]",880,Nuevue Cases with “active technology” just go...,[51.5141],[-0.1094],51.514099,-0.1094
1,Mon Jun 01 15:03:57 +0000 2020,"[-0.1094, 51.5141]",1012,We say no to hatred. \nThe time for change is ...,[51.5141],[-0.1094],51.514099,-0.1094
2,Mon Jun 01 15:05:07 +0000 2020,"[-0.22166667, 51.475]",3,🅵🆄🅻🅷🅰🅼 - 🅲🅷🅰🆁🅻🆃🅾🅽 🅾🅲🆃🅾🅱🅴🆁 2019 #fulhamfc #socc...,[51.475],[-0.22166666],51.474998,-0.221667
3,Mon Jun 01 15:05:40 +0000 2020,"[-0.09838343, 51.51014807]",3,🆂🅺🆈🆂🅲🆁🅰🅿🅴🆁🆂 🅵🆁🅾🅼 🅼🅸🅻🅻🅴🅽🅽🅸🆄🅼 🅱🆁🅸🅳🅶🅴 #milleniumb...,[51.510147],[-0.09838343],51.510147,-0.098383
4,Mon Jun 01 15:09:27 +0000 2020,"[-0.19756, 51.36119]",404,Make the most of Monday evening with a delicio...,[51.36119],[-0.19756],51.361191,-0.19756


In [60]:
import plotly.express as px

fig = px.scatter_mapbox(lldd_pdf, lat="llat", lon="llong", color="followers_count",
                        color_continuous_scale=px.colors.cyclical.IceFire, zoom=9.5, height=600)
fig.update_layout(mapbox_style="open-street-map")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

In [61]:
import plotly.express as px

fig = px.scatter_mapbox(lldd_pdf, lat="llat", lon="llong", color="followers_count",
                        color_continuous_scale=px.colors.cyclical.IceFire, zoom=9.5, height=600)
fig.update_layout(
    mapbox_style="carto-positron"
)
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

In [74]:
token = "pk.eyJ1IjoidGFsbGFtanIiLCJhIjoiY2thd3hxNnlxMDB0MzJzbWE3ZXVmb3p4cyJ9.3n6WNEqHSeZGAPPA47Fp1w"

In [99]:
import plotly.express as px

fig = px.scatter_mapbox(lldd_pdf, lat="llat", lon="llong", color="followers_count",
                        color_discrete_sequence=["fuchsia"], zoom=9.5, height=600)
fig.update_layout(mapbox_style="dark", mapbox_accesstoken=token)
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show()

In [None]:
# res.select("tweets.created_at", "tweets.coordinates", "tweets.user.followers_count") \
#                 .where(col("tweets.coordinates").isNull()) \
#                 .where(col("tweets.user.followers_count") > 100000) \
#                 .show()

In [None]:
# res.select("tweets.created_at", "tweets.place.bounding_box.coordinates", "tweets.user.followers_count") \
#                 .where(col("tweets.place.bounding_box.coordinates").isNotNull()) \
#                 .show()

In [42]:
ddf.withColumn("follower_count_times_two", multiply(col("tweets.user.`followers_count`"))).show()

+--------------------+------------------------+
|              tweets|follower_count_times_two|
+--------------------+------------------------+
|[,, Mon Jun 01 14...|                    4586|
|[,, Mon Jun 01 14...|                      74|
|[,, Mon Jun 01 14...|                    2514|
|[,, Mon Jun 01 14...|                     122|
|[,, Mon Jun 01 14...|                     106|
|[,, Mon Jun 01 14...|                      28|
|[,, Mon Jun 01 14...|                     220|
|[,, Mon Jun 01 14...|                     156|
|[,, Mon Jun 01 14...|                     160|
|[,, Mon Jun 01 14...|                     636|
|[,, Mon Jun 01 14...|                      50|
|[,, Mon Jun 01 14...|                      56|
|[,, Mon Jun 01 14...|                    2498|
|[,, Mon Jun 01 14...|                     570|
|[,, Mon Jun 01 14...|                      30|
|[,, Mon Jun 01 14...|                    1854|
|[,, Mon Jun 01 14...|                     280|
|[,, Mon Jun 01 14...|                  

In [62]:
tweets.stop()