## Stream Processing Arquitectura Completa

## Prerequisits

In [1]:
pip install tweepy

Note: you may need to restart the kernel to use updated packages.


In [2]:
import sys
import os
import tweepy
import json
import pandas as pd
from pyspark import sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Create table with city values

In [3]:
city_list = ['Barcelona', 'Bilbao', 'Ibiza', 'Madrid', 'Oviedo', 'Sevilla', 'Valencia']
beach = [10, 0, 10, 0, 0, 0, 8]
city = [10, 9, 4, 10, 5, 7, 9]
nature = [3, 7, 9, 3, 9, 5, 6]
party = [8, 7, 10, 9, 4, 7, 8]
d = {'Beach': beach, 'City': city, 'Nature': nature, 'Party': party}

cities = pd.DataFrame(data=d, index=city_list)

cities

Unnamed: 0,Beach,City,Nature,Party
Barcelona,10,10,3,8
Bilbao,0,9,7,7
Ibiza,10,4,9,10
Madrid,0,10,3,9
Oviedo,0,5,9,4
Sevilla,0,7,5,7
Valencia,8,9,6,8


## Create streaming for Flats

In [4]:
# Initialize SparkSession
spark = SparkSession \
    .builder \
    .appName("QualityLife") \
    .getOrCreate()

Let's create a data processing for the flats

In [5]:
# Create flats_df stream
flats_df_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "broker:29092") \
  .option("subscribe", "flats") \
  .load()

flats_df_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
# Create schema for flats_df
schema_flats = StructType(
    [
        StructField('house_city', StringType(), True),
        StructField('house_rooms', StringType(), True),
        StructField('house_code', StringType(), True),
        StructField('house_rent', StringType(), True)
        
    ]
)

flats_df = flats_df_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
    .withColumn("value", from_json("value", schema_flats)) \
    .select(col('key'), col("timestamp"), col('value.*'))

flats_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- house_city: string (nullable = true)
 |-- house_rooms: string (nullable = true)
 |-- house_code: string (nullable = true)
 |-- house_rent: string (nullable = true)



In [7]:
# Create table to store query output in memory
flats_df.writeStream \
 .outputMode("append") \
 .format("memory") \
 .option("truncate", "false") \
 .queryName("flats_all") \
 .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f620e344a50>

In [8]:
spark.sql("select * from flats_all order by timestamp desc").show(truncate = False)

+---+---------+----------+-----------+----------+----------+
|key|timestamp|house_city|house_rooms|house_code|house_rent|
+---+---------+----------+-----------+----------+----------+
+---+---------+----------+-----------+----------+----------+



In [9]:
# TESTING!!!!
city_name = 'Valencia'
members = 7
max_rent = int('29999')*0.3/12
query = "SELECT * FROM flats_all WHERE house_city='{}' AND house_rooms>={} AND house_rent<={} ORDER BY house_rent DESC LIMIT 1".format(city_name, members, max_rent)
print(query)
city_flats = spark.sql(query).toPandas()
flat_code = city_flats.house_code

SELECT * FROM flats_all WHERE house_city='Valencia' AND house_rooms>=7 AND house_rent<=749.9749999999999 ORDER BY house_rent DESC LIMIT 1


## Create streaming for families

In [10]:
# Create families_df_stream
families_df_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "broker:29092") \
  .option("subscribe", "families") \
  .load()

families_df_stream.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [11]:
# TESTING!!!!!
families_test = families_df_stream.selectExpr("CAST(value AS STRING)")

families_test.writeStream \
.outputMode("update") \
.format("memory") \
.option("truncate", "false") \
.queryName("families_test") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f620e0ffa90>

In [12]:
# TESTING!!!!!
spark.sql("select * from families_test").show(truncate = False)

+-----+
|value|
+-----+
+-----+



In [13]:
# Create schema for families_df
schema_families = StructType(
    [
        StructField('people_city', StringType(), True),
        StructField('people_members', StringType(), True),
        StructField('people_party', StringType(), True),
        StructField('people_beach', StringType(), True),
        StructField('tweet_id', StringType(), True),
        StructField('people_salary', StringType(), True),
        StructField('people_age', StringType(), True),
        StructField('people_name', StringType(), True),
        StructField('people_nature', StringType(), True)
    ]
)

families_df = families_df_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp") \
    .withColumn("value", from_json("value", schema_families)) \
    .select(col('key'), col("timestamp"), col('value.*'))

families_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- people_city: string (nullable = true)
 |-- people_members: string (nullable = true)
 |-- people_party: string (nullable = true)
 |-- people_beach: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- people_salary: string (nullable = true)
 |-- people_age: string (nullable = true)
 |-- people_name: string (nullable = true)
 |-- people_nature: string (nullable = true)



In [14]:
# Create table to store query output in memory
families_df.writeStream \
 .outputMode("append") \
 .format("memory") \
 .option("truncate", "false") \
 .queryName("families_all") \
 .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f620e334a90>

In [15]:
spark.sql("select * from families_all order by timestamp desc").show(truncate = False)

+---+---------+-----------+--------------+------------+------------+--------+-------------+----------+-----------+-------------+
|key|timestamp|people_city|people_members|people_party|people_beach|tweet_id|people_salary|people_age|people_name|people_nature|
+---+---------+-----------+--------------+------------+------------+--------+-------------+----------+-----------+-------------+
+---+---------+-----------+--------------+------------+------------+--------+-------------+----------+-----------+-------------+



In [16]:
# DEFINE FUNCTIONS FOR DATA STREAM

# best_city
def best_city(df, hobbies):
    hobbies = [int(i) for i in hobbies]
    df = abs(df - hobbies)
    df['total'] = df.sum(axis=1)
    df = df.sort_values('total')
    city_name = df.index.values[0]
    return city_name

def flats_city(city_name, members, max_rent):
    query = (
        "SELECT * FROM flats_all "
        "WHERE house_city='{}' AND house_rooms>={} AND house_rent<={} "
        "ORDER BY house_rent DESC LIMIT 1".format(city_name, members, max_rent)
    )
    city_flats = spark.sql(query).toPandas()
    if flat_code.empty:
        flat_code = 0
    else:
        flat_code = int(city_flats.house_code)
    return flat_code

# reply_tweet
def reply_tweet(tweet_id, reply_text):
    #AUTHENTICATION
    consumer_key = "V7ynrT3Ur3dx1PBLoKh8098Cm"
    consumer_secret = "Na0QjDAVrh72YQUmAS4soVJjdlkDcdYRvWHTT4dThq7r9K7o1k"
    access_token = "775989731989581825-DZoRlXKk6ykuEl7b7gJrV62HZ1DMBsE"
    access_token_secret = "InGNAuYf68BywigWoDYBAyIFGDqVZ9bRMPje4tGXEuq5c"

    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    tweet = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)
    
    # Reply tweet
    tweet.update_status(reply_text, in_reply_to_status_id=tweet_id)
    


In [17]:
# DEFIEN process_row function for each stream row
# process_row
def process_row(row):
    # Retrieve best city
    hobbies = [row['people_beach'], row['people_city'], row['people_nature'], row['people_party']]
    city_name = best_city(cities, hobbies)
    # Retrieve best flat
    members = int(row['people_members'])
    max_rent = int(row['people_salary']) * 0.3 / 12 
    flat_code = flats_city(city_name, members, max_rent)
    # reply_tweet
    tweet_id = row['tweet_id']
    people_name = row['people_name']
    if flat_code == 0:
        reply_text = (
            "Hi {}, we still don't have the perfect house for you in our database, "
            "but we will gladly help you find it. Don't hesitate to contact us! #mdaedem".format(people_name)
        )
    else:
        reply_text = (
            "Hi {}, we have found a perfect house for you in {} with code {}. "
            "Please, contact us for further details! #mdaedem".format(people_name,city_name,flat_code)
        )
    reply_tweet(tweet_id, reply_text)
    

In [18]:
families_df_query = families_df.writeStream \
 .foreach(process_row) \
 .start()

Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/serializers.py", line 597, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 863, in dumps
    cp.dump(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 260, in dump
    return Pickler.dump(self, obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/opt/conda/lib/python3.7/pickle.py", line 789, in save_tuple
    save(element)
  File "/opt/conda/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 406, in save_function
    self.save_function_tuple(obj)
  File "/usr/local/spark/python/pyspark/cloudpickle.py", line 549, in save_function_tuple
    save(state)
  File "/opt/c

PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.