<a href="https://colab.research.google.com/github/rajatkb/Machine-Learning-Pipelining-With-Spark-Tensorflow/blob/main/Machine_Learning_Pipelining_using_Spark_and_Tensorflow_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparing environment

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://mirrors.estointernet.in/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install py4j



In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [3]:
import findspark
findspark.init("spark-3.0.1-bin-hadoop2.7")# SPARK_HOME


from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.ui.showConsoleProgress", "true").getOrCreate() 

In [5]:
## Test
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [6]:
spark.sparkContext.defaultParallelism

2

## dataset

Dataset Kaggle : [tweeter sentiment for airline](https://www.kaggle.com/crowdflower/twitter-airline-sentiment)

# Data Pipeline

- Understanding the *what* and *why* 
- **Python :** Generators , Python Numpy vectorised operation , Trust the framework
- **Spark :** As a replacement to Pandas, data preprocessing , Use cases 
- **Tensorflow :** TfRecords API , preprocessing & binary serialization , data -> gpu training pipelining

## Python

### Generators

In [None]:
## fun1
## fun2 ^


In [68]:
with open('Tweets.csv') as file:
    print(file.read())

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [71]:
def retGen(filename):
    with open(filename) as file:
        for line in file:
            yield line

In [73]:
for line in retGen(filename='Tweets.csv'):
    print(line)
    break
## 1 char -> 8 bits (0)
## integer -> 4bytes -> 32bits

tweet_id,airline_sentiment,airline_sentiment_confidence,negativereason,negativereason_confidence,airline,airline_sentiment_gold,name,negativereason_gold,retweet_count,text,tweet_coord,tweet_created,tweet_location,user_timezone



In [74]:
import numpy as np

In [75]:
a = np.random.rand(5 , 5)
a

array([[0.13171713, 0.69582503, 0.12882325, 0.43408877, 0.69102702],
       [0.92619879, 0.30199973, 0.400361  , 0.94925899, 0.46643273],
       [0.64269566, 0.02992722, 0.32962271, 0.04712762, 0.32514932],
       [0.34551986, 0.9435124 , 0.12822285, 0.92502212, 0.49776182],
       [0.52617413, 0.76673489, 0.80774216, 0.11767549, 0.38528107]])

In [76]:
## PYTHON LOOPSS ARE SLOW

In [77]:
a > 0.5

array([[False,  True, False, False,  True],
       [ True, False, False,  True, False],
       [ True, False, False, False, False],
       [False,  True, False,  True, False],
       [ True,  True,  True, False, False]])

In [None]:
## GPU ALTERNATIVE TO NUMPY -> JAX (Google) , Numba 

In [78]:
a[a> 0.5] = 5

In [79]:
a

array([[0.13171713, 5.        , 0.12882325, 0.43408877, 5.        ],
       [5.        , 0.30199973, 0.400361  , 5.        , 0.46643273],
       [5.        , 0.02992722, 0.32962271, 0.04712762, 0.32514932],
       [0.34551986, 5.        , 0.12822285, 5.        , 0.49776182],
       [5.        , 5.        , 5.        , 0.11767549, 0.38528107]])

In [81]:
## Question 1

a = np.random.randint((10 , 10))

In [83]:
## Answer

## a =?
## a[i][j] < 5 = 0
## a[i][j] > 5 = 100


## Spark

* [Cheat-Sheet](https://github.com/kevinschaich/pyspark-cheatsheet)

In [7]:
from pyspark.sql import functions as F, types as T

In [8]:
import pprint
pp = pprint.PrettyPrinter(indent = 4)

In [11]:
### Read the data first

file_name = 'Tweets.csv'
## pandas -> Dataframe, 
## RDD
## resilient distributed dataset
## operation on a RDD -> DAG 

df = spark.read.format('csv') \
      .option('header',True) \
      .option('inferSchema',True) \
      .load(file_name)

In [12]:
df.show(20) ## non agregating operation but no RDD

+--------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+
|            tweet_id|airline_sentiment|airline_sentiment_confidence|negativereason|negativereason_confidence|       airline|airline_sentiment_gold|           name|negativereason_gold|retweet_count|                text|tweet_coord|       tweet_created|      tweet_location|       user_timezone|
+--------------------+-----------------+----------------------------+--------------+-------------------------+--------------+----------------------+---------------+-------------------+-------------+--------------------+-----------+--------------------+--------------------+--------------------+
|  570306133677760513|          neutral|                         1.0|          null|                     null|Virgi

In [19]:
df.describe(['*']).show()

+-------+--------------------+-----------------+----------------------------+--------------------+-------------------------+--------------------+----------------------+-------------+-------------------+------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|summary|            tweet_id|airline_sentiment|airline_sentiment_confidence|      negativereason|negativereason_confidence|             airline|airline_sentiment_gold|         name|negativereason_gold|     retweet_count|                text|         tweet_coord|       tweet_created|          tweet_location|       user_timezone|
+-------+--------------------+-----------------+----------------------------+--------------------+-------------------------+--------------------+----------------------+-------------+-------------------+------------------+--------------------+--------------------+--------------------+------------------------+--------------------+
|  coun

In [13]:
df.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- airline_sentiment_confidence: string (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_sentiment_gold: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negativereason_gold: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)



In [14]:
print("Total count : ",df.count())

Total count :  14837


### Clean Tweet airline_sentiment

In [16]:
df_clean_airline_sentiment = df.withColumn('clean_airline_sentiment' ,F.regexp_extract(df.airline_sentiment , r'^(positive|negative|neutral)$' , 1) )
df_clean_airline_sentiment = df_clean_airline_sentiment.na.drop(how='any', subset=['clean_airline_sentiment'])

In [17]:
df_clean_airline_sentiment = df_clean_airline_sentiment.filter(df_clean_airline_sentiment.clean_airline_sentiment != '')

In [18]:
df_clean_airline_sentiment.select('clean_airline_sentiment').distinct().show()

+-----------------------+
|clean_airline_sentiment|
+-----------------------+
|               positive|
|                neutral|
|               negative|
+-----------------------+



In [21]:
df_aug1 = df_clean_airline_sentiment.drop('airline_sentiment').withColumnRenamed('clean_airline_sentiment', 'airline_sentiment')
df_clean = df_aug1

In [22]:
## small dataset no
## BIG DATASET -> yes
df_clean.select('airline_sentiment','tweet_id').sample(0.3).show(20)

## lazy evaluation 
## Tranformations 
## Actions -> aggregates partitions (data partitions)
## show()
## 

+-----------------+------------------+
|airline_sentiment|          tweet_id|
+-----------------+------------------+
|          neutral|570306133677760513|
|          neutral|570301083672813571|
|         negative|570301031407624196|
|         positive|570289724453216256|
|         positive|570287408438120448|
|         positive|570285904809598977|
|         negative|570282469121007616|
|         positive|570277724385734656|
|         negative|570276917301137409|
|         positive|570267956648792064|
|         positive|570264145116819457|
|         positive|570259420287868928|
|         negative|570249102404923392|
|         negative|570114021854212096|
|         negative|570084582780899328|
|          neutral|570016304284901379|
|         positive|570012257549070337|
|          neutral|570011341483843584|
|          neutral|570009713447825408|
|         negative|569967019958730753|
+-----------------+------------------+
only showing top 20 rows



### Clean Tweet airline_sentiment_confidence 

In [26]:
df_aug1.select('airline_sentiment_confidence').sample(0.3).show(20)

+----------------------------+
|airline_sentiment_confidence|
+----------------------------+
|                      0.3486|
|                         1.0|
|                         1.0|
|                         1.0|
|                         1.0|
|                      0.6705|
|                         1.0|
|                         1.0|
|                         1.0|
|                      0.6791|
|                      0.6688|
|                      0.6578|
|                         1.0|
|                      0.7007|
|                         1.0|
|                         1.0|
|                         1.0|
|                      0.6492|
|                         1.0|
|                      0.6792|
+----------------------------+
only showing top 20 rows



In [27]:
df_aug1.filter(df_aug1.airline_sentiment_confidence == '').count() ## Its already clean

0

In [28]:
df_aug2 = df_aug1.withColumn('clean_airline_sentiment_confidence', F.regexp_extract(df.airline_sentiment_confidence , r'^(\d+.\d+)$' , 1).cast("float"))

In [29]:
df_aug2.select('clean_airline_sentiment_confidence').sample(0.3).show(20)

+----------------------------------+
|clean_airline_sentiment_confidence|
+----------------------------------+
|                               1.0|
|                            0.6745|
|                            0.6559|
|                            0.6769|
|                               1.0|
|                               1.0|
|                            0.6842|
|                               1.0|
|                               1.0|
|                               1.0|
|                             0.615|
|                               1.0|
|                               1.0|
|                            0.6207|
|                            0.6578|
|                               1.0|
|                               1.0|
|                               1.0|
|                               1.0|
|                            0.7007|
+----------------------------------+
only showing top 20 rows



In [30]:
df_aug2.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- airline_sentiment_confidence: string (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_sentiment_gold: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negativereason_gold: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- clean_airline_sentiment_confidence: float (nullable = true)



In [31]:
df_aug2.select('clean_airline_sentiment_confidence', 'airline_sentiment_confidence').describe().show()

+-------+----------------------------------+----------------------------+
|summary|clean_airline_sentiment_confidence|airline_sentiment_confidence|
+-------+----------------------------------+----------------------------+
|  count|                             14640|                       14640|
|   mean|                0.9001688524429264|          0.9001688524590152|
| stddev|               0.16282995912820175|          0.1628299590986725|
|    min|                             0.335|                       0.335|
|    max|                               1.0|                         1.0|
+-------+----------------------------------+----------------------------+



In [32]:
df_aug2 = df_aug2.drop('airline_sentiment_confidence').withColumnRenamed('clean_airline_sentiment_confidence', 'airline_sentiment_confidence')
df_clean = df_aug2

In [33]:
df_clean.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_sentiment_gold: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negativereason_gold: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- airline_sentiment_confidence: float (nullable = true)



### Clean Tweet retweet_count

In [34]:
df_aug2.select('retweet_count').distinct().count()

19

In [35]:
df_aug2.select('retweet_count').distinct().show()


+-------------+
|retweet_count|
+-------------+
|           31|
|           28|
|           44|
|           22|
|         null|
|            1|
|            6|
|            3|
|            5|
|           15|
|            9|
|            4|
|            8|
|            7|
|           32|
|           11|
|            2|
|            0|
|           18|
+-------------+



In [36]:
# extract numbers
df_aug3 = df_aug2.withColumn('clean_retweet_count', F.regexp_extract(df_aug2.retweet_count , r'^(\d+)$' , 1))

In [37]:
df_aug3 = df_aug3.filter(df_aug3.clean_retweet_count != '')

In [38]:
df_aug3.select('retweet_count').distinct().show()

+-------------+
|retweet_count|
+-------------+
|           31|
|           28|
|           44|
|           22|
|            1|
|            6|
|            3|
|            5|
|           15|
|            9|
|            4|
|            8|
|            7|
|           32|
|           11|
|            2|
|            0|
|           18|
+-------------+



In [39]:
df_aug3 = df_aug3.withColumn('clean_retweet_count', df_aug3.clean_retweet_count.cast('integer'))

In [40]:
df_aug3.select('retweet_count').distinct().show()

+-------------+
|retweet_count|
+-------------+
|           31|
|           28|
|           44|
|           22|
|            1|
|            6|
|            3|
|            5|
|           15|
|            9|
|            4|
|            8|
|            7|
|           32|
|           11|
|            2|
|            0|
|           18|
+-------------+



In [41]:
df_aug3.printSchema()

root
 |-- tweet_id: string (nullable = true)
 |-- negativereason: string (nullable = true)
 |-- negativereason_confidence: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- airline_sentiment_gold: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negativereason_gold: string (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_coord: string (nullable = true)
 |-- tweet_created: string (nullable = true)
 |-- tweet_location: string (nullable = true)
 |-- user_timezone: string (nullable = true)
 |-- airline_sentiment: string (nullable = true)
 |-- airline_sentiment_confidence: float (nullable = true)
 |-- clean_retweet_count: integer (nullable = true)



In [42]:
df_clean = df_aug3

### Select Columns of interest 

In [43]:
df_clean.select('airline_sentiment','airline_sentiment_confidence', 'retweet_count').sample(0.4, seed=34).show(30)

+-----------------+----------------------------+-------------+
|airline_sentiment|airline_sentiment_confidence|retweet_count|
+-----------------+----------------------------+-------------+
|          neutral|                      0.6837|            0|
|         negative|                         1.0|            0|
|         positive|                      0.6745|            0|
|          neutral|                      0.6769|            0|
|         positive|                         1.0|            0|
|         positive|                         1.0|            0|
|         positive|                         1.0|            0|
|         positive|                         1.0|            0|
|         negative|                      0.6705|            0|
|         positive|                         1.0|            0|
|         positive|                         1.0|            0|
|         negative|                         1.0|            0|
|          neutral|                       0.615|       

In [69]:
dataset = df_clean.select('airline_sentiment','airline_sentiment_confidence', 'retweet_count')

## Converting target class One-Hot Encoding

In [70]:
#getting distinct values
sentiment_list = list(map(lambda x: x.asDict()['airline_sentiment'],  dataset.select('airline_sentiment').distinct().collect()))

In [71]:
sentiment_list

['positive', 'neutral', 'negative']

In [72]:
sentiment_label_class = dict(map(lambda x: (x[1],x[0]),enumerate(sentiment_list)))

In [73]:
sentiment_label_class

{'negative': 2, 'neutral': 1, 'positive': 0}

In [74]:
## Bummer if we do this here , we end up creating massive extra redundant data that we can always generate when keeping in TFrecord

In [75]:
df_clean.count()

14632

In [83]:
from pyspark.sql import Row
_tmp1 = spark.createDataFrame([Row(col=1 , val='a'), Row(col=2 , val='b') , Row(col=3 , val='c')])
_tmp2 = spark.createDataFrame([Row(col=1 , val='d'), Row(col=2 , val='e') , Row(col=3 , val='f')])
_tmp = _tmp1.join(_tmp2 , on='col' , how='inner')
_tmp.show()

+---+---+---+
|col|val|val|
+---+---+---+
|  1|  a|  d|
|  3|  c|  f|
|  2|  b|  e|
+---+---+---+



In [125]:
## so we just replace it by the index
from pyspark.sql import Row

## GOING FULL RDD MODE for this one

def transform(r):
    data = r.asDict()
    return Row(**data ,label= sentiment_label_class[data['airline_sentiment']] )

prepared_dataset = dataset.rdd.map(transform).toDF().drop('airline_sentiment')
prepared_dataset = prepared_dataset.withColumn('retweet_count', prepared_dataset.retweet_count.cast('integer'))
prepared_dataset = prepared_dataset.withColumn('label', prepared_dataset.label.cast('integer'))
prepared_dataset.show()

prepared_dataset.count() ### -> MLIB or -> GCP Bucket -> TPU/GPU

+----------------------------+-------------+-----+
|airline_sentiment_confidence|retweet_count|label|
+----------------------------+-------------+-----+
|                         1.0|            0|    1|
|         0.34860000014305115|            0|    0|
|          0.6837000250816345|            0|    1|
|                         1.0|            0|    2|
|                         1.0|            0|    2|
|                         1.0|            0|    2|
|          0.6744999885559082|            0|    0|
|          0.6340000033378601|            0|    1|
|          0.6559000015258789|            0|    0|
|                         1.0|            0|    0|
|          0.6769000291824341|            0|    1|
|                         1.0|            0|    0|
|                         1.0|            0|    0|
|          0.6450999975204468|            0|    0|
|                         1.0|            0|    0|
|          0.6841999888420105|            0|    2|
|                         1.0| 

14632

In [126]:
prepared_dataset.printSchema()

root
 |-- airline_sentiment_confidence: double (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- label: integer (nullable = true)



# TensorFlow TfRecords

In [127]:
import tensorflow as tf
import tensorflow.keras as keras
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sn

In [135]:
prepared_dataset.head(10)



[Row(airline_sentiment_confidence=1.0, retweet_count=0, label=1),
 Row(airline_sentiment_confidence=0.34860000014305115, retweet_count=0, label=0),
 Row(airline_sentiment_confidence=0.6837000250816345, retweet_count=0, label=1),
 Row(airline_sentiment_confidence=1.0, retweet_count=0, label=2),
 Row(airline_sentiment_confidence=1.0, retweet_count=0, label=2),
 Row(airline_sentiment_confidence=1.0, retweet_count=0, label=2),
 Row(airline_sentiment_confidence=0.6744999885559082, retweet_count=0, label=0),
 Row(airline_sentiment_confidence=0.6340000033378601, retweet_count=0, label=1),
 Row(airline_sentiment_confidence=0.6559000015258789, retweet_count=0, label=0),
 Row(airline_sentiment_confidence=1.0, retweet_count=0, label=0)]

In [136]:
dataset_gen = prepared_dataset.toLocalIterator(prefetchPartitions=True)    

In [132]:
# prepared_dataset.count()
# next(dataset_gen)

Row(airline_sentiment_confidence=0.6837000250816345, retweet_count=0, label=1)

## Create static tf record by consuming Spark Data

In [116]:
def _bytes_feature(value):
  """Returns a bytes_list from a string / byte."""
  # If the value is an eager tensor BytesList won't unpack a string from an EagerTensor.
  if isinstance(value, type(tf.constant(0))):
      value = value.numpy() 
  return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
  """Returns a float_list from a float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
  """Returns an int64_list from a bool / enum / int / uint."""
  return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

In [117]:
#Defining the features of the images
## 
## 
def serialize_example(x , y):
  features = {
      'x': _bytes_feature(x),
      'y': _int64_feature(y),
  }
  #Creating Features message
  example_proto = tf.train.Example(features=tf.train.Features(feature=features))
  return example_proto.SerializeToString() 

In [118]:
#reading from the file
def read_tfrecord(serialized_example):
    feature_description = {
        'x': tf.io.FixedLenFeature((), tf.string),
        'y': tf.io.FixedLenFeature((), tf.int64),
    }
    example = tf.io.parse_single_example(serialized_example, feature_description)
    
    _shape = tf.stack([2])
    _raw = example['x']
    # Decode the raw bytes so it becomes a tensor with type.
    x = tf.io.decode_raw(_raw, tf.float64)
    x = tf.reshape(x, _shape)
    # image = tf.reverse(image, axis=[2])
    y = tf.one_hot(example['y'] , depth = len(sentiment_list)) ## from way above
    return x,y 

In [121]:
def convertToRecord(dataset_gen , tf_filename='./dataset.tfrecords'):
    with tf.io.TFRecordWriter(tf_filename) as writer:
          for sample in dataset_gen:
              _dt = sample.asDict()
              x = np.array([_dt['airline_sentiment_confidence'], _dt['retweet_count']])
              x = x.astype(np.float64)
              xb = x.tobytes()
              y = _dt['label']
              sd = serialize_example(xb , y)
              writer.write(sd)

In [137]:
convertToRecord(dataset_gen)

In [138]:
import multiprocessing
cnt = multiprocessing.cpu_count()

batch_size = 32

def load_dataset(filenames , batch_size = 32,shuffle=True):
    ignore_order = tf.data.Options()
    ignore_order.experimental_deterministic = False  # disable order, increase speed
    dataset = tf.data.TFRecordDataset(
        filenames
    )  
    # automatically interleaves reads from multiple files
    dataset = dataset.with_options(
        ignore_order
    )  # uses data as soon as it streams in, rather than in its original order
    dataset = dataset.prefetch(buffer_size = batch_size * 2)
    dataset = dataset.map(
        read_tfrecord , num_parallel_calls =  tf.data.experimental.AUTOTUNE
    )
    
    dataset = dataset.batch(batch_size)
    if shuffle:
      dataset = dataset.shuffle(batch_size)
    
    # returns a dataset of (image, label) pairs if labeled=True or just images if labeled=False
    return dataset

In [139]:
dataset = load_dataset(['dataset.tfrecords'])

In [67]:
## Question : Try to get count of 
## class 1 , 2 , 3 -> positive , negative , neutral