**16.4.2 Spark DataFrames and Datasets**

In [None]:
#Install PySpark packages 

import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("BlueSkyHotelReviews").getOrCreate()

Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:7 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Get:15 htt

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()

In [None]:
dataframe = spark.createDataFrame([(0,"Here is our DataFrame"),
                                   (1,"We are making one from screach"),
                                   (2, "This will look very similar to a Pandas DataFrame")
                                   ],["id", "words"])
dataframe.show()

+---+--------------------+
| id|               words|
+---+--------------------+
|  0|Here is our DataF...|
|  1|We are making one...|
|  2|This will look ve...|
+---+--------------------+



In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("food.csv"), sep=",", header=True)
df.show()

+-------+-----+
|   food|price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+



In [None]:
# Print our schema
df.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: string (nullable = true)



In [None]:
# Show the columns
df.columns

['food', 'price']

In [None]:
# Describe our data
df.describe()

DataFrame[summary: string, food: string, price: string]

In [None]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [None]:
# Next we need to create the list of struct fields
schema = [StructField("food", StringType(), True), StructField("price", IntegerType(), True),]
schema

[StructField(food,StringType,true), StructField(price,IntegerType,true)]

In [None]:
# Pass in our fields
final = StructType(fields=schema)
final

StructType(List(StructField(food,StringType,true),StructField(price,IntegerType,true)))

In [None]:
# Read our data with our new schema
dataframe = spark.read.csv(SparkFiles.get("food.csv"), schema=final, sep=",", header=True)
dataframe.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: integer (nullable = true)



In [None]:
dataframe['price']

Column<b'price'>

In [None]:
type(dataframe['price'])

pyspark.sql.column.Column

In [None]:
dataframe.select('price')

DataFrame[price: int]

In [None]:
type(dataframe.select('price'))

pyspark.sql.dataframe.DataFrame

In [None]:
dataframe.select('price').show()

+-----+
|price|
+-----+
|    0|
|   12|
|   10|
+-----+



In [None]:
# Add new column
dataframe.withColumn('newprice', dataframe['price']).show()

+-------+-----+--------+
|   food|price|newprice|
+-------+-----+--------+
|  pizza|    0|       0|
|  sushi|   12|      12|
|chinese|   10|      10|
+-------+-----+--------+



In [None]:
# Update colomn name
dataframe.withColumnRenamed('price', 'newerprice').show()

+-------+----------+
|   food|newerprice|
+-------+----------+
|  pizza|         0|
|  sushi|        12|
|chinese|        10|
+-------+----------+



In [None]:
#Double the price
dataframe.withColumn('doubleprice', dataframe['price']*2).show()

+-------+-----+-----------+
|   food|price|doubleprice|
+-------+-----+-----------+
|  pizza|    0|          0|
|  sushi|   12|         24|
|chinese|   10|         20|
+-------+-----+-----------+



In [None]:
# Add a dollar to the price
dataframe.withColumn('add_one_dollar', dataframe['price']+1).show()

+-------+-----+--------------+
|   food|price|add_one_dollar|
+-------+-----+--------------+
|  pizza|    0|             1|
|  sushi|   12|            13|
|chinese|   10|            11|
+-------+-----+--------------+



In [None]:
# Half the price
dataframe.withColumn('half_price',dataframe['price']/2).show()

+-------+-----+----------+
|   food|price|half_price|
+-------+-----+----------+
|  pizza|    0|       0.0|
|  sushi|   12|       6.0|
|chinese|   10|       5.0|
+-------+-----+----------+



**16.4.3 Spark Functions**

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameFunctions").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_1/wine.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("wine.csv"), sep=",", header=True)
# Show DataFrame
df.show()

+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|country|         description|         designation|points|price|          province|            region_1|         region_2|           variety|              winery|
+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|     US|This tremendous 1...|   Martha's Vineyard|    96|  235|        California|         Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|
|  Spain|Ripe aromas of fi...|Carodorum Selecci...|    96|  110|    Northern Spain|                Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|
|     US|Mac Watson honors...|Special Selected ...|    96|   90|        California|      Knights Valley|           Sonoma|   Sauvignon Blanc|            Macauley|
|     US|This spent 20

In [None]:
# Order a DataFrame by ascending values
df.orderBy(df["points"].desc())

DataFrame[country: string, description: string, designation: string, points: string, price: string, province: string, region_1: string, region_2: string, variety: string, winery: string]

In [None]:
# actions
df.orderBy(df["points"].desc()).show(5)

+-------+--------------------+--------------------+------+-----+----------+--------------------+---------------+---------------+--------------------+
|country|         description|         designation|points|price|  province|            region_1|       region_2|        variety|              winery|
+-------+--------------------+--------------------+------+-----+----------+--------------------+---------------+---------------+--------------------+
|  Italy|Here's a ���wow�۝...|            Messorio|    99|  320|   Tuscany|             Toscana|           null|         Merlot|        Le Macchiole|
|     US|A stupendous Pino...|Precious Mountain...|    99|   94|California|        Sonoma Coast|         Sonoma|     Pinot Noir|     Williams Selyem|
|  Italy|The 2007 Ornellai...|           Ornellaia|    99|  200|   Tuscany|  Bolgheri Superiore|           null|      Red Blend|Tenuta dell'Ornel...|
|     US|Depth and texture...|Royal City Stoner...|    99|  140|Washington|Columbia Valley (WA)|Colu

In [None]:
# Import functions
from pyspark.sql.functions import avg
df.select(avg("points")).show()

+-----------------+
|      avg(points)|
+-----------------+
|87.88834105383143|
+-----------------+



In [None]:
# Filter
df.filter("price<20").show(5)

+--------+--------------------+-----------+------+-----+----------+--------------------+-----------------+--------------+--------------------+
| country|         description|designation|points|price|  province|            region_1|         region_2|       variety|              winery|
+--------+--------------------+-----------+------+-----+----------+--------------------+-----------------+--------------+--------------------+
|Bulgaria|This Bulgarian Ma...|    Bergul̩|    90|   15|  Bulgaria|                null|             null|        Mavrud|        Villa Melnik|
|   Spain|Earthy plum and c...|     Amandi|    90|   17|   Galicia|       Ribeira Sacra|             null|       Menc�_a|      Don Bernardino|
|      US|There's a lot to ...|       null|    90|   18|California|Russian River Valley|           Sonoma|    Chardonnay|            De Loach|
|      US|Massively fruity,...|       null|    91|   19|    Oregon|   Willamette Valley|Willamette Valley|    Pinot Gris|   Trinity Vineyards|

In [None]:
# Filter by price on certain columns
df.filter("price<20").select(['points','country', 'winery', 'price']).show(5)

+------+--------+--------------------+-----+
|points| country|              winery|price|
+------+--------+--------------------+-----+
|    90|Bulgaria|        Villa Melnik|   15|
|    90|   Spain|      Don Bernardino|   17|
|    90|      US|            De Loach|   18|
|    91|      US|   Trinity Vineyards|   19|
|    91|Portugal|Adega Cooperativa...|   15|
+------+--------+--------------------+-----+
only showing top 5 rows



In [None]:
# Filter on exace stateMB
df.filter(df["country"] == "US").show()

+-------+--------------------+--------------------+------+-----+----------+--------------------+-----------------+------------------+--------------------+
|country|         description|         designation|points|price|  province|            region_1|         region_2|           variety|              winery|
+-------+--------------------+--------------------+------+-----+----------+--------------------+-----------------+------------------+--------------------+
|     US|This tremendous 1...|   Martha's Vineyard|    96|  235|California|         Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|
|     US|Mac Watson honors...|Special Selected ...|    96|   90|California|      Knights Valley|           Sonoma|   Sauvignon Blanc|            Macauley|
|     US|This spent 20 mon...|             Reserve|    96|   65|    Oregon|   Willamette Valley|Willamette Valley|        Pinot Noir|               Ponzi|
|     US|This re-named vin...|              Silice|    95|   65|    Or

**16.5.1 Natural Language Processing**

In [None]:
#Natural Language Toolkit library
! pip install nltk
! python -m nltk.downloader popular

[nltk_data] Downloading collection 'popular'
[nltk_data]    | 
[nltk_data]    | Downloading package cmudict to /root/nltk_data...
[nltk_data]    |   Package cmudict is already up-to-date!
[nltk_data]    | Downloading package gazetteers to /root/nltk_data...
[nltk_data]    |   Package gazetteers is already up-to-date!
[nltk_data]    | Downloading package genesis to /root/nltk_data...
[nltk_data]    |   Package genesis is already up-to-date!
[nltk_data]    | Downloading package gutenberg to /root/nltk_data...
[nltk_data]    |   Package gutenberg is already up-to-date!
[nltk_data]    | Downloading package inaugural to /root/nltk_data...
[nltk_data]    |   Package inaugural is already up-to-date!
[nltk_data]    | Downloading package movie_reviews to
[nltk_data]    |     /root/nltk_data...
[nltk_data]    |   Package movie_reviews is already up-to-date!
[nltk_data]    | Downloading package names to /root/nltk_data...
[nltk_data]    |   Package names is already up-to-date!
[nltk_data]    | Do

In [None]:
# get the PoS tags
import nltk
from nltk import word_tokenize
text = word_tokenize("I enjoy biking on the trails")
output = nltk.pos_tag(text)
print(output)

[('I', 'PRP'), ('enjoy', 'VBP'), ('biking', 'VBG'), ('on', 'IN'), ('the', 'DT'), ('trails', 'NNS')]


16.6.1 Tokenize Data

In [None]:
# import the Tokenizer library from pyspark
from pyspark.ml.feature import Tokenizer

In [None]:
# Creat sample DataFrame
dataframe =spark.createDataFrame([
                                  (0, "Spark is great"),
                                  (1, "We are learning Spark"),
                                  (2, "Spark is better than hadoop no doubt")
], ["id", "sentence"])
dataframe.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  0|      Spark is great|
|  1|We are learning S...|
|  2|Spark is better t...|
+---+--------------------+



In [None]:
# Tokenize sentences
tokenizer = Tokenizer (inputCol="sentence", outputCol="words")
tokenizer

Tokenizer_d8ef7d8cf7c2

In [None]:
# Transform and show DataFrame
tokenized_df = tokenizer.transform(dataframe)
tokenized_df.show(truncate=False)

+---+------------------------------------+--------------------------------------------+
|id |sentence                            |words                                       |
+---+------------------------------------+--------------------------------------------+
|0  |Spark is great                      |[spark, is, great]                          |
|1  |We are learning Spark               |[we, are, learning, spark]                  |
|2  |Spark is better than hadoop no doubt|[spark, is, better, than, hadoop, no, doubt]|
+---+------------------------------------+--------------------------------------------+



In [None]:
# Creat a fuction to return the length of a list
def word_list_length(word_list):
  return len(word_list)

In [None]:
# import the udf function, the col function to select a column to be passed into a function,
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [None]:
# Creat a user defined function
count_tokens = udf(word_list_length, IntegerType())

In [None]:
# Creat our Tokenizer
tokenizer = Tokenizer(inputCol = "sentence", outputCol = "words")
# Transform DataFrame
tokenized_df = tokenizer.transform(dataframe)
# Select the needed columns and don't truncate results
tokenized_df.withColumn("tokens", count_tokens(col("words"))).show(truncate=False)

+---+------------------------------------+--------------------------------------------+------+
|id |sentence                            |words                                       |tokens|
+---+------------------------------------+--------------------------------------------+------+
|0  |Spark is great                      |[spark, is, great]                          |3     |
|1  |We are learning Spark               |[we, are, learning, spark]                  |4     |
|2  |Spark is better than hadoop no doubt|[spark, is, better, than, hadoop, no, doubt]|7     |
+---+------------------------------------+--------------------------------------------+------+



**16.6.2 Stop Words**
: Stop words are words that have little or no linguistic value in NLP.
Removing these words from the data can improve the accuracy of the
language model because it removes inessential words.

In [None]:
# Creat DataFrame
sentenceData = spark.createDataFrame([
                                      (0,["Big", "data", "is", "super", "powerful"]),
                                      (1,["This", "is", "going", "to", "be", "epic"])
], ["id", "raw"])
sentenceData.show(truncate=False)

+---+--------------------------------+
|id |raw                             |
+---+--------------------------------+
|0  |[Big, data, is, super, powerful]|
|1  |[This, is, going, to, be, epic] |
+---+--------------------------------+



In [None]:
# Import stop words library
from pyspark.ml.feature import StopWordsRemover

In [None]:
# Run the Remover
remover = StopWordsRemover(inputCol = "raw", outputCol = "filtered")
remover.transform(sentenceData).show(truncate=False)

+---+--------------------------------+----------------------------+
|id |raw                             |filtered                    |
+---+--------------------------------+----------------------------+
|0  |[Big, data, is, super, powerful]|[Big, data, super, powerful]|
|1  |[This, is, going, to, be, epic] |[going, epic]               |
+---+--------------------------------+----------------------------+



**16.6.3 Term Frequency-Inverse Document Frequency Weight**
: a statistical weight showing the importance of a word in a document.

In [None]:
# Start Spark sesion
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TF-IDF").getOrCreate()
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/airlines.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("airlines.csv"), sep=",", header=True)

# show DataFrame
df.show()

+--------------------+
|      Airline Tweets|
+--------------------+
|@VirginAmerica pl...|
|@VirginAmerica se...|
|@VirginAmerica do...|
|@VirginAmerica Ar...|
|@VirginAmerica aw...|
+--------------------+



In [None]:
# Tokenize DataFrame
tokened = Tokenizer(inputCol="Airline Tweets", outputCol="words")
tokened_transformed = tokened.transform(df)
tokened_transformed.show()

+--------------------+--------------------+
|      Airline Tweets|               words|
+--------------------+--------------------+
|@VirginAmerica pl...|[@virginamerica, ...|
|@VirginAmerica se...|[@virginamerica, ...|
|@VirginAmerica do...|[@virginamerica, ...|
|@VirginAmerica Ar...|[@virginamerica, ...|
|@VirginAmerica aw...|[@virginamerica, ...|
+--------------------+--------------------+



In [None]:
# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed_frame = remover.transform(tokened_transformed)
removed_frame.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------

In [None]:
# Run the hashing term frequency
# HashingTF , which converts words to numeric IDs.
# The HashingTF function takes an argument for an input column, an output
# column, and a numFeature parameter, which specifies the number of buckets for the split words.
hashing = HashingTF(inputCol="filtered", outputCol="hashedValues", numFeatures=pow(2,18))

# transform into a DF
hashed_df = hashing.transform(removed_frame)
hashed_df.show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+
|Airline Tweets                                                                                                                         |words                                                                                                                                                          |filtered                                                                                       |hashedValues                                                             

In [None]:
# Fit the IDF on the data set
# IDF Modeles  will scale the values while down-weighting based on document frequency.
idf = IDF(inputCol="hashedValues", outputCol="features")
idfModel = idf.fit(hashed_df)
rescaledData = idfModel.transform(hashed_df)

# Display the DataFrame
rescaledData.select("words", "features").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|words                                                                                                                                                          |features                                                                                                                                                                                                                                                                                                        |
+-----------------------------------------------------------------

**16.6.4 Pipeline Setup to Run the Model**

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Yelp_NLP").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+--------+--------------------+
|   class|                text|
+--------+--------------------+
|positive|Wow... Loved this...|
|negative|  Crust is not good.|
|negative|Not tasty and the...|
|positive|Stopped by during...|
|positive|The selection on ...|
|negative|Now I am getting ...|
|negative|Honeslty it didn'...|
|negative|The potatoes were...|
|positive|The fries were gr...|
|positive|      A great touch.|
|positive|Service was very ...|
|negative|  Would not go back.|
|negative|The cashier had n...|
|positive|I tried the Cape ...|
|negative|I was disgusted b...|
|negative|I was shocked bec...|
|positive| Highly recommended.|
|negative|Waitress was a li...|
|negative|This place is not...|
|negative|did not like at all.|
+--------+--------------------+
only showing top 20 rows



In [None]:
# create a new column that uses the length function to create a future feature with the length of each row.
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

# create a new column that uses the length function to create a future feature with the length of each row.
from pyspark.sql.functions import length
# Creat a length column to be used as a future feature
data_df = df.withColumn('length', length(df['text']))
data_df.show()

+--------+--------------------+------+
|   class|                text|length|
+--------+--------------------+------+
|positive|Wow... Loved this...|    24|
|negative|  Crust is not good.|    18|
|negative|Not tasty and the...|    41|
|positive|Stopped by during...|    87|
|positive|The selection on ...|    59|
|negative|Now I am getting ...|    46|
|negative|Honeslty it didn'...|    37|
|negative|The potatoes were...|   111|
|positive|The fries were gr...|    25|
|positive|      A great touch.|    14|
|positive|Service was very ...|    24|
|negative|  Would not go back.|    18|
|negative|The cashier had n...|    99|
|positive|I tried the Cape ...|    59|
|negative|I was disgusted b...|    62|
|negative|I was shocked bec...|    50|
|positive| Highly recommended.|    19|
|negative|Waitress was a li...|    38|
|negative|This place is not...|    51|
|negative|did not like at all.|    20|
+--------+--------------------+------+
only showing top 20 rows



In [None]:
# create all the transformations to be applied in our pipeline. 
# Note that the StringIndexer encodes a string column to a column of table
# indexes. Here we are working with positive and negative game reviews, which will be converted to 0 and 1.

# Creat all the features to the data set
pos_neg_to_num = StringIndexer (inputCol='class', outputCol='label')
tokenizer = Tokenizer(inputCol="text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol="token_text", outputCol="sotp_tokens")
hashingTF = HashingTF(inputCol="sotp_tokens", outputCol="hash_token")
idf = IDF(inputCol="hash_token", outputCol='idf_token')

In [None]:
# create a feature vector containing the output from the IDFModel (the last stage in the pipeline) and the length.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Creat feature vectors
clean_up = VectorAssembler(inputCols=["idf_token", "length"], outputCol="features")

In [None]:
# We'll import the pipeline from pyspark.ml , and then store a list of the stages created  earlier.
# Creat and run a data procession Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[pos_neg_to_num, tokenizer, stopremove, hashingTF, idf, clean_up])

**16.6.5 Run the Model**

In [None]:
# fit and transform the pipeline
cleaner = data_prep_pipeline.fit(data_df)
cleaned = cleaner.transform(data_df)
cleaned.select(["label", "features"]).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|(262145,[177414,2...|
|  0.0|(262145,[49815,23...|
|  0.0|(262145,[109649,1...|
|  1.0|(262145,[53101,68...|
|  1.0|(262145,[15370,77...|
|  0.0|(262145,[98142,13...|
|  0.0|(262145,[59172,22...|
|  0.0|(262145,[63420,85...|
|  1.0|(262145,[53777,17...|
|  1.0|(262145,[221827,2...|
|  1.0|(262145,[43756,22...|
|  0.0|(262145,[127310,1...|
|  0.0|(262145,[407,3153...|
|  1.0|(262145,[18098,93...|
|  0.0|(262145,[23071,12...|
|  0.0|(262145,[129941,1...|
|  1.0|(262145,[19633,21...|
|  0.0|(262145,[27707,65...|
|  0.0|(262145,[20891,27...|
|  0.0|(262145,[8287,208...|
+-----+--------------------+
only showing top 20 rows



In [None]:
# split the data into a training set and a testing set, run the following code:
# So 70% to training and 30% to testing. 
# The second number supplied is called a seed. The seed number here, 21, is arbitrary.
# Break data down into a training set and a testing set
training, testing = cleaned.randomSplit([0.7, 0.3], 21)


In [None]:
# ML model we'll use is Naive Bayes, which we'll import and then fit the model using the training dataset
# Naive Bayes is a group of classifier algorithms based on Bayes' theorem.
from pyspark.ml.classification import NaiveBayes
# Creat a Naivy Bayes model and fit traning data
nb = NaiveBayes()
predictor = nb.fit(training)

# Transform the model with the testing data
# prediction column will indicate with a 1.0 if the model thinks this
# review is negative and 0.0 if it thinks it's positive. Future data sets can now
# be run with this model and determine whether a review was positive or
# negative without having already supplied in the data.

test_results = predictor.transform(testing)
test_results.show(5)

+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|   class|                text|length|label|          token_text|         sotp_tokens|          hash_token|           idf_token|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|negative|"The burger... I ...|    86|  0.0|["the, burger...,...|["the, burger...,...|(262144,[20298,21...|(262144,[20298,21...|(262145,[20298,21...|[-820.60780566975...|[0.99999999999995...|       0.0|
|negative|              #NAME?|     6|  0.0|            [#name?]|            [#name?]|(262144,[197050],...|(262144,[197050],...|(262145,[197050,2...|[-73.489435340867...|[0.07515735596910.

In [None]:
# The last step is to import the BinaryClassificationEvaluator , which
# will display how accurate our model is in determining if a review with be
# positive or negative based solely on the text within a review.

# The BinaryClassificationEvaluator uses two arguments, labelCol and rawPredictionCol.
# The labelCol takes the labels which were the result of using StringIndexer to convert our positive and negative strings to
# integers. The rawPredictionCol takes in numerical predictions from the
# output of running the Naive Bayes model.

from pyspark.ml.evaluation import BinaryClassificationEvaluator
acc_eval = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.700298


**16.9.1 PySpark ETL**

In [None]:
!wget http://jdbc.postgresql.org/download/postgresql-42.2.16.jar

URL transformed to HTTPS due to an HSTS policy
--2021-01-27 05:23:41--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2021-01-27 05:23:43 (1.21 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
# Read in Data from S3 Buckets
from pyspark import SparkFiles
url ="https://sibbuckettest.s3.us-east-2.amazonaws.com/user_data.csv"
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("user_data.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
user_data_df.show()

+---+----------+---------+-----------+--------------------+--------------------+------------+
| id|first_name|last_name|active_user|      street_address|               state|    username|
+---+----------+---------+-----------+--------------------+--------------------+------------+
|  1|    Cletus|  Lithcow|      false| 78309 Riverside Way|            Virginia|   ibearham0|
|  2|       Caz|   Felgat|      false| 83 Hazelcrest Place|             Alabama|    wwaller1|
|  3|     Kerri|  Crowson|      false|      112 Eliot Pass|      North Carolina|   ichesnut2|
|  4|   Freddie|    Caghy|      false|     15 Merchant Way|            New York|     tsnarr3|
|  5|   Sadella|    Deuss|      false|    079 Acker Avenue|           Tennessee|   fwherrit4|
|  6|    Fraser|  Korneev|       true|  76084 Novick Court|           Minnesota|  fstappard5|
|  7|    Demott|   Rapson|       true|    86320 Dahle Park|District of Columbia|  lhambling6|
|  8|    Robert|    Poile|      false|1540 Manitowish Hill| 

In [None]:
url ="https://sibbuckettest.s3.us-east-2.amazonaws.com/user_payment.csv"
spark.sparkContext.addFile(url)
user_payment_df = spark.read.csv(SparkFiles.get("user_payment.csv"), sep=",", header=True, inferSchema=True)
# Show DataFrame
user_payment_df.show()

+----------+------------+--------------------+
|billing_id|    username|        cc_encrypted|
+----------+------------+--------------------+
|         1|   ibearham0|a799fcafe47d7fb19...|
|         2|    wwaller1|a799fcafe47d7fb19...|
|         3|   ichesnut2|a799fcafe47d7fb19...|
|         4|     tsnarr3|a799fcafe47d7fb19...|
|         5|   fwherrit4|a799fcafe47d7fb19...|
|         6|  fstappard5|a799fcafe47d7fb19...|
|         7|  lhambling6|a799fcafe47d7fb19...|
|         8|      drude7|a799fcafe47d7fb19...|
|         9|   bspawton8|a799fcafe47d7fb19...|
|        10| rmackeller9|a799fcafe47d7fb19...|
|        11| cdennerleya|a799fcafe47d7fb19...|
|        12|    gsarfasb|a799fcafe47d7fb19...|
|        13| mpichefordc|a799fcafe47d7fb19...|
|        14|     bingryd|a799fcafe47d7fb19...|
|        15|   wheinerte|a799fcafe47d7fb19...|
|        16|    mdrewetf|a799fcafe47d7fb19...|
|        17|droughsedgeg|a799fcafe47d7fb19...|
|        18|     abaakeh|a799fcafe47d7fb19...|
|        19| 

**Transform**

In [None]:
# Join the two tables
joined_df = user_data_df.join(user_payment_df, on ="username", how="inner")
joined_df.show()

+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|    username| id|first_name|last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|   ibearham0|  1|    Cletus|  Lithcow|      false| 78309 Riverside Way|            Virginia|         1|a799fcafe47d7fb19...|
|    wwaller1|  2|       Caz|   Felgat|      false| 83 Hazelcrest Place|             Alabama|         2|a799fcafe47d7fb19...|
|   ichesnut2|  3|     Kerri|  Crowson|      false|      112 Eliot Pass|      North Carolina|         3|a799fcafe47d7fb19...|
|     tsnarr3|  4|   Freddie|    Caghy|      false|     15 Merchant Way|            New York|         4|a799fcafe47d7fb19...|
|   fwherrit4|  5|   Sadella|    Deuss|      false|    079 Acker Avenue|           Tennessee|         5|a799fcafe47d7f

In [None]:
# Drop any rows with null
dropna_df = joined_df.dropna()
dropna_df.show()

+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|    username| id|first_name|last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+------------+---+----------+---------+-----------+--------------------+--------------------+----------+--------------------+
|   ibearham0|  1|    Cletus|  Lithcow|      false| 78309 Riverside Way|            Virginia|         1|a799fcafe47d7fb19...|
|    wwaller1|  2|       Caz|   Felgat|      false| 83 Hazelcrest Place|             Alabama|         2|a799fcafe47d7fb19...|
|   ichesnut2|  3|     Kerri|  Crowson|      false|      112 Eliot Pass|      North Carolina|         3|a799fcafe47d7fb19...|
|     tsnarr3|  4|   Freddie|    Caghy|      false|     15 Merchant Way|            New York|         4|a799fcafe47d7fb19...|
|   fwherrit4|  5|   Sadella|    Deuss|      false|    079 Acker Avenue|           Tennessee|         5|a799fcafe47d7f

In [None]:
# Filter for active users
# Load in a sql function to use columns
from pyspark.sql.functions import col 
# Filter for only columns with active users
cleaned_df = dropna_df.filter (col("active_user") == True)
cleaned_df.show()

+-------------+---+----------+-----------+-----------+--------------------+--------------------+----------+--------------------+
|     username| id|first_name|  last_name|active_user|      street_address|               state|billing_id|        cc_encrypted|
+-------------+---+----------+-----------+-----------+--------------------+--------------------+----------+--------------------+
|   fstappard5|  6|    Fraser|    Korneev|       true|  76084 Novick Court|           Minnesota|         6|a799fcafe47d7fb19...|
|   lhambling6|  7|    Demott|     Rapson|       true|    86320 Dahle Park|District of Columbia|         7|a799fcafe47d7fb19...|
|    wheinerte| 15|   Sadella|      Jaram|       true|7528 Waxwing Terrace|         Connecticut|        15|a799fcafe47d7fb19...|
| droughsedgeg| 17|    Hewitt|    Trammel|       true|    2455 Corry Alley|      North Carolina|        17|a799fcafe47d7fb19...|
|    ydudeniei| 19|       Ted|    Knowlys|       true|      31 South Drive|                Ohio| 

In [None]:
# Creat user dataframe to match active_user table
clean_user_df = cleaned_df.select(["id", "first_name", "last_name", "username"])
clean_user_df.show()

+---+----------+-----------+-------------+
| id|first_name|  last_name|     username|
+---+----------+-----------+-------------+
|  6|    Fraser|    Korneev|   fstappard5|
|  7|    Demott|     Rapson|   lhambling6|
| 15|   Sadella|      Jaram|    wheinerte|
| 17|    Hewitt|    Trammel| droughsedgeg|
| 19|       Ted|    Knowlys|    ydudeniei|
| 23|  Annmarie|     Lafond|     fmyttonm|
| 28|      Toma|     Sokell|   bfletcherr|
| 30|       Ram|    Lefever|     gturleyt|
| 31|    Raddie|    Heindle|    calyukinu|
| 33|    Wallie|       Caws| ckleinlererw|
| 34|    Derril|Varfolomeev|  pshanklandx|
| 39|     Kelcy|     Wheway|    enelane12|
| 40|    Dorree|    Rookeby|    sfollet13|
| 41|    Martyn|       Tott|      mtesh14|
| 43|     Cally|      Thody|   tseyfart16|
| 45|       Ted|   Pittaway|   hfarrier18|
| 48|      Fifi|    Lidgley|     nabbie1b|
| 50|    Ashely|     O'Hern|  ystadding1d|
| 53|   Diannne|Osbaldeston|hhallgalley1g|
| 60|     Sonny|     Jeskin|   ageaveny1n|
+---+------

In [None]:
# Create a DataFrame to match the payment_info table
clean_billing_df = cleaned_df.select(["billing_id", "street_address", "state", "username"])
clean_billing_df.show()

+----------+--------------------+--------------------+-------------+
|billing_id|      street_address|               state|     username|
+----------+--------------------+--------------------+-------------+
|         6|  76084 Novick Court|           Minnesota|   fstappard5|
|         7|    86320 Dahle Park|District of Columbia|   lhambling6|
|        15|7528 Waxwing Terrace|         Connecticut|    wheinerte|
|        17|    2455 Corry Alley|      North Carolina| droughsedgeg|
|        19|      31 South Drive|                Ohio|    ydudeniei|
|        23|     35 Oriole Place|             Georgia|     fmyttonm|
|        28|39641 Eggendart Hill|            Maryland|   bfletcherr|
|        30|   9969 Laurel Alley|               Texas|     gturleyt|
|        31|   811 Talmadge Road|                Ohio|    calyukinu|
|        33|   9999 Kenwood Pass|              Oregon| ckleinlererw|
|        34|     4 Jenifer Court|             Florida|  pshanklandx|
|        39|93207 Morningstar...| 

In [None]:
# Create a DataFrame to match the payment_info table
clean_payment_df = cleaned_df.select(["billing_id", "cc_encrypted"])
clean_payment_df.show()

+----------+--------------------+
|billing_id|        cc_encrypted|
+----------+--------------------+
|         6|a799fcafe47d7fb19...|
|         7|a799fcafe47d7fb19...|
|        15|a799fcafe47d7fb19...|
|        17|a799fcafe47d7fb19...|
|        19|a799fcafe47d7fb19...|
|        23|a799fcafe47d7fb19...|
|        28|a799fcafe47d7fb19...|
|        30|a799fcafe47d7fb19...|
|        31|a799fcafe47d7fb19...|
|        33|a799fcafe47d7fb19...|
|        34|a799fcafe47d7fb19...|
|        39|a799fcafe47d7fb19...|
|        40|a799fcafe47d7fb19...|
|        41|a799fcafe47d7fb19...|
|        43|a799fcafe47d7fb19...|
|        45|a799fcafe47d7fb19...|
|        48|a799fcafe47d7fb19...|
|        50|a799fcafe47d7fb19...|
|        53|a799fcafe47d7fb19...|
|        60|a799fcafe47d7fb19...|
+----------+--------------------+
only showing top 20 rows



**Load:**
final step is to get our transformed raw data into our database.


In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://amazonuserdb.ckl1urlayos2.us-east-2.rds.amazonaws.com:5432/PracticeDB"
config = {"user": "postgres",
          "password": "hwmqNEtVfE30cMEcDTLu",
          "driver":"org.postgresql.Driver"}



In [None]:
clean_user_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)

In [None]:
clean_billing_df.write.jdbc(url=jdbc_url, table='billing_info', mode=mode, properties=config)

In [None]:
clean_payment_df.write.jdbc(url=jdbc_url, table='payment_info', mode=mode, properties=config)