<a href="https://colab.research.google.com/github/karenbennis/Xy/blob/storyboard/pyspark_to_pandas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<br>**A notebook**<br><br>

In [None]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

#Interact with SQL
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

# Start Spark Session(Creating spark application with name defined by appName()) ---IMPORTED WITH EVERY COLAB NOTEBOOK
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("database_transformation").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()


--2020-07-20 17:48:27--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-07-20 17:48:28 (1.42 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
# gcloud login and check the DB
!gcloud auth login
!gcloud config set project 'xy-yelp'
!gcloud sql instances describe 'xy-yelp'

In [None]:
# download and initialize the psql proxy
!wget https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64 -O cloud_sql_proxy
!chmod +x cloud_sql_proxy
# "connectionName" is from the previous block
!nohup ./cloud_sql_proxy -instances="xy-yelp:northamerica-northeast1:xy-yelp"=tcp:5432 &
!sleep 30s

--2020-07-19 18:17:05--  https://dl.google.com/cloudsql/cloud_sql_proxy.linux.amd64
Resolving dl.google.com (dl.google.com)... 108.177.13.93, 108.177.13.136, 108.177.13.190, ...
Connecting to dl.google.com (dl.google.com)|108.177.13.93|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14492253 (14M) [application/octet-stream]
Saving to: ‘cloud_sql_proxy’


2020-07-19 18:17:05 (301 MB/s) - ‘cloud_sql_proxy’ saved [14492253/14492253]

nohup: appending output to 'nohup.out'


In [None]:
db_password = 'kjhbyelpdb'

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://127.0.0.1:5432/xy_yelp_db"
config = {"user":"postgres", 
          "password": db_password, 
          "driver":"org.postgresql.Driver"}

**Extract tables**

In [None]:
# Pull review table
review_df2 = spark.read.jdbc(url=jdbc_url, table='review',properties=config)
review_df2.show(5)

+--------------------+--------------------+-----+----+------+-----+-----------+-----------+
|           review_id|         review_text|stars|cool|useful|funny|review_date|review_type|
+--------------------+--------------------+-----+----+------+-----+-----------+-----------+
|cALYebKb5hygdKHql...|This is a very in...|    4|   0|     0|    0| 2011-01-12|     review|
|SawdMXLYD5ytRmMFv...|I LOVE Chic Nails...|    5|   0|     2|    0| 2011-01-20|     review|
|j-jMQdELr6AFkCcEH...|After the Padres ...|    5|   0|     0|    0| 2011-01-06|     review|
|SmUMyCUNrT9HEo_DX...|I have to admit t...|    4|   0|     1|    0| 2010-01-17|     review|
|oTB_mpCKcu-8wayQQ...|Best food, super ...|    5|   0|     1|    0| 2011-01-14|     review|
+--------------------+--------------------+-----+----+------+-----+-----------+-----------+
only showing top 5 rows



In [None]:
# Pull business table
business_df2 = spark.read.jdbc(url=jdbc_url, table='business',properties=config)
business_df2.show(5)

+--------------------+--------------------+
|           review_id|         business_id|
+--------------------+--------------------+
|fWKvX83p0-ka4JS3d...|9yKzy9PApeiPPOUJE...|
|IjZ33sJrzXqU-0X6U...|ZRJwVLyzEJq1VAihD...|
|IESLBzqUCLdSzSqm0...|6oRAC4uyJCsJl1X0W...|
|G-WvGaISbqqaMHlNn...|_1QQZuf4zZOyFCvXc...|
|1uJFq2r5QfJG_6ExM...|6ozycU1RpktNG2-1B...|
+--------------------+--------------------+
only showing top 5 rows



In [None]:
# Pull yelp_user table
user_df2 = spark.read.jdbc(url=jdbc_url, table='yelp_user',properties=config)
user_df2.show(5)

+--------------------+--------------------+
|           review_id|             user_id|
+--------------------+--------------------+
|GJGUHAAONtBSBj53c...|Z3c7xGRfeV-uMkSbA...|
|nQH2KAvAeOJOYKX99...|ryjqXdp68i2I9JPOp...|
|-yKcbjWSlmKC1zTMT...|5W-ruHmpkwLyI6Lla...|
|20aES_-g5Vyqfzojn...|vhxFLqRok6r-D_aQz...|
|W_d9w7yr3koSUXHco...|aBnKTxZzdhabTXfzt...|
+--------------------+--------------------+
only showing top 5 rows



In [None]:
# Join tables
spark_df = review_df2.join(business_df2, on="review_id", how="inner")
spark_df = spark_df.join(user_df2, on="review_id", how="inner")
spark_df.show(5)

+--------------------+--------------------+-----+----+------+-----+-----------+-----------+--------------------+--------------------+
|           review_id|         review_text|stars|cool|useful|funny|review_date|review_type|         business_id|             user_id|
+--------------------+--------------------+-----+----+------+-----+-----------+-----------+--------------------+--------------------+
|-7yxrdY13ay15rGB7...|I have been going...|    5|   0|     0|    0| 2010-01-16|     review|Lh9nz0KYyzE-YRbKu...|ayKW9eWwGFcrtJaHc...|
|-Be0UUGYuiDJVAM_Y...|Since Im big into...|    4|   0|     2|    2| 2011-01-25|     review|pa6K7DGByxBXxcVJ5...|_4lqpCYCqOQzbB6xQ...|
|-nQHHXi-d_yuW301_...|A pleasant place ...|    2|   0|     0|    0| 2011-01-12|     review|GIGI8bJfN6HyPzmEW...|4QORbyhfN01oKR_Gg...|
|2L30O7G8IQ6HILpR0...|part of a social ...|    5|   0|     0|    0| 2010-01-24|     review|qiwajZigq_2twTmYo...|ST8Yzlk2MqKlcaLqL...|
|4x5yLG7_yGLuN-w6f...|I love every plac...|    4|   0|     1| 

**Transformation**

In [None]:
import pyspark.sql.functions as F

spark_df=spark_df.withColumn('length',F.length('review_text'))

In [None]:
spark_df=spark_df.withColumn('class',F.when( (spark_df["stars"]>3), 1).otherwise(0))
spark_df.show()

+--------------------+--------------------+-----+----+------+-----+-----------+-----------+--------------------+--------------------+------+-----+
|           review_id|         review_text|stars|cool|useful|funny|review_date|review_type|         business_id|             user_id|length|class|
+--------------------+--------------------+-----+----+------+-----+-----------+-----------+--------------------+--------------------+------+-----+
|-7yxrdY13ay15rGB7...|I have been going...|    5|   0|     0|    0| 2010-01-16|     review|Lh9nz0KYyzE-YRbKu...|ayKW9eWwGFcrtJaHc...|   670|    1|
|-Be0UUGYuiDJVAM_Y...|Since Im big into...|    4|   0|     2|    2| 2011-01-25|     review|pa6K7DGByxBXxcVJ5...|_4lqpCYCqOQzbB6xQ...|  1348|    1|
|-nQHHXi-d_yuW301_...|A pleasant place ...|    2|   0|     0|    0| 2011-01-12|     review|GIGI8bJfN6HyPzmEW...|4QORbyhfN01oKR_Gg...|   813|    0|
|2L30O7G8IQ6HILpR0...|part of a social ...|    5|   0|     0|    0| 2010-01-24|     review|qiwajZigq_2twTmYo...|ST8Yzl

<br></br>**Pipeline**<br></br>

In [None]:
# Import functions
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer

In [None]:
# Make stars values a list
from pyspark.sql.functions import col, split
spark_df = spark_df.withColumn("star_array", split(col("stars"), " "))
spark_df.show()

In [None]:
# Initialize a CoutVectorizer
from pyspark.ml.feature import CountVectorizer
star_vectorizer = CountVectorizer(inputCol="star_array", outputCol="stars_one_hot", vocabSize=5, minDF=1.0)

In [None]:
# Create a vector model
star_vector_model = star_vectorizer.fit(spark_df)

In [None]:
# One hot encoded column
df_ohe = star_vector_model.transform(spark_df)
df_ohe.show(3)

+--------------------+--------------------+-----+----+------+-----+-----------+-----------+--------------------+--------------------+------+-----+----------+-------------+
|           review_id|         review_text|stars|cool|useful|funny|review_date|review_type|         business_id|             user_id|length|class|star_array|stars_one_hot|
+--------------------+--------------------+-----+----+------+-----+-----------+-----------+--------------------+--------------------+------+-----+----------+-------------+
|-7yxrdY13ay15rGB7...|I have been going...|    5|   0|     0|    0| 2010-01-16|     review|Lh9nz0KYyzE-YRbKu...|ayKW9eWwGFcrtJaHc...|   670|    1|       [5]|(5,[1],[1.0])|
|-Be0UUGYuiDJVAM_Y...|Since Im big into...|    4|   0|     2|    2| 2011-01-25|     review|pa6K7DGByxBXxcVJ5...|_4lqpCYCqOQzbB6xQ...|  1348|    1|       [4]|(5,[0],[1.0])|
|-nQHHXi-d_yuW301_...|A pleasant place ...|    2|   0|     0|    0| 2011-01-12|     review|GIGI8bJfN6HyPzmEW...|4QORbyhfN01oKR_Gg...|   813|

In [None]:
# Create all the features to the data set
#star_rating = StringIndexer(inputCol='stars_one_hot',outputCol='label')
tokenizer = Tokenizer(inputCol="review_text", outputCol="token_text")
stopremove = StopWordsRemover(inputCol='token_text',outputCol='stop_tokens')
hashingTF = HashingTF(inputCol="stop_tokens", outputCol='hash_token')
idf = IDF(inputCol='hash_token', outputCol='idf_token')

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
# Create feature vector 
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')

In [None]:
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

In [None]:
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(df_ohe)
cleaned = cleaner.transform(df_ohe)

In [None]:
#@title
cleaned.show()

In [None]:
cleaned.show()

+--------------------+--------------------+-----+----+------+-----+-----------+-----------+--------------------+--------------------+------+-----+----------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|           review_id|         review_text|stars|cool|useful|funny|review_date|review_type|         business_id|             user_id|length|class|star_array|stars_one_hot|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------------------+--------------------+-----+----+------+-----+-----------+-----------+--------------------+--------------------+------+-----+----------+-------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|-7yxrdY13ay15rGB7...|I have been going...|    5|   0|     0|    0| 2010-01-16|     review|Lh9nz0KYyzE-YRbKu...|ayKW9eWwGFcrtJaHc...|   670|    1|       [5]|(5,[1],[1.0]

In [None]:
#Drop intermediate columns
x=cleaned.select('features','stars_one_hot','stars')

In [None]:
# Break data down into a training set and a testing set
#training, testing = x.randomSplit([0.7, 0.3], 21)

In [None]:
#training.show()

In [None]:
# Authenticate user
from google.colab import auth
auth.authenticate_user()

In [None]:
# Set project id
project_id = 'xy-yelp'

In [None]:
# Set project
!gcloud config set project {project_id}

Updated property [core/project].


In [None]:
# File path to save json file to
filepath = '/tmp/ml_j.json'

In [None]:
# Save json file **** this will break if the file already exists, which it does at this point, therefore its commented out for now
#x.coalesce(1).write.format('json').save(filepath)

In [None]:
# Bucket name
bucket_name = 'xy-bucket'

In [None]:
# Copy saved file from /tmp to bucket
#!gsutil cp -r /tmp/ml_j.json/ gs://{bucket_name}/json_files/

Copying file:///tmp/ml_j.json/_SUCCESS [Content-Type=application/octet-stream]...
Copying file:///tmp/ml_j.json/.part-00000-92b085ac-0a90-49de-85dd-2f1b433007ee-c000.json.crc [Content-Type=application/octet-stream]...
Copying file:///tmp/ml_j.json/._SUCCESS.crc [Content-Type=application/octet-stream]...
Copying file:///tmp/ml_j.json/part-00000-92b085ac-0a90-49de-85dd-2f1b433007ee-c000.json [Content-Type=application/json]...
- [4 files][ 16.1 MiB/ 16.1 MiB]                                                
Operation completed over 4 objects/16.1 MiB.                                     


In [None]:
# Get file which was saved to bucket **** file saves as multiple files so this should be addressed in the future, for now the 11mb file should be fine
!gsutil cp -r gs://xy-bucket/json_files/ml_j.json/part-00000-92b085ac-0a90-49de-85dd-2f1b433007ee-c000.json /tmp/machine_json.json

Copying gs://xy-bucket/json_files/ml_j.json/part-00000-92b085ac-0a90-49de-85dd-2f1b433007ee-c000.json...
/ [1 files][ 16.0 MiB/ 16.0 MiB]                                                
Operation completed over 1 objects/16.0 MiB.                                     


In [None]:
# Set filepath to tmp folder where bucket file was saved **** this could actually be done without saving to the bucket, this is just done to make sure we can 
# save to and read from bucket if we need to
filepath = '/tmp/machine_json.json'

In [None]:
# Import pandas and read json file into DataFrame
import pandas as pd
hope_this_works = pd.read_json(filepath, lines=True)

In [None]:
# Show dataframe
hope_this_works.head()

Unnamed: 0,features,stars_one_hot,stars
0,"{'type': 0, 'size': 262145, 'indices': [9521, ...","{'type': 0, 'size': 5, 'indices': [1], 'values...",5
1,"{'type': 0, 'size': 262145, 'indices': [78, 31...","{'type': 0, 'size': 5, 'indices': [0], 'values...",4
2,"{'type': 0, 'size': 262145, 'indices': [1846, ...","{'type': 0, 'size': 5, 'indices': [3], 'values...",2
3,"{'type': 0, 'size': 262145, 'indices': [8804, ...","{'type': 0, 'size': 5, 'indices': [1], 'values...",5
4,"{'type': 0, 'size': 262145, 'indices': [17141,...","{'type': 0, 'size': 5, 'indices': [0], 'values...",4


In [None]:
# View full contents of cell to understand how json data was read into DataFrame
thing = hope_this_works.iat[0,0]
print(thing)

{'type': 0, 'size': 262145, 'indices': [9521, 13381, 24113, 34146, 47205, 48870, 52657, 55639, 69793, 72944, 74473, 78329, 79660, 80245, 82321, 82582, 87910, 89717, 95454, 95502, 101376, 107367, 109230, 113458, 113462, 115157, 117481, 118144, 120391, 125372, 128924, 130707, 132270, 132538, 134691, 137431, 138356, 141407, 154186, 157120, 168385, 181758, 184251, 188828, 192137, 195155, 208258, 216432, 218117, 221790, 222394, 229772, 232685, 235700, 236821, 240976, 245044, 262144], 'values': [13.146379889022644, 4.547001272864449, 1.6436118877681851, 12.331835858506187, 10.720585530532915, 4.213228093212401, 6.90785527398247, 2.310717259691643, 8.111828078308406, 7.824146005856625, 3.139702638974026, 2.464028238403142, 7.418680897748461, 4.625472888305944, 4.213228093212401, 13.004780331748613, 6.725533717188516, 2.067612965814895, 8.517293186416572, 8.517293186416572, 3.973998404146567, 2.415853786100017, 10.014967732503301, 2.852598100468416, 5.083305981931424, 3.5302677579594492, 3.249

In [None]:
# Separate dictionary values keys into columns keeping the values and column values
features_series = hope_this_works.features.apply(pd.Series)

In [None]:
# Drop type as it was an extra json value added during conversion
features_series = features_series.drop('type', axis=1)

In [None]:
# Check DataFrame structure
features_series.head(3)

Unnamed: 0,size,indices,values
0,262145,"[9521, 13381, 24113, 34146, 47205, 48870, 5265...","[13.146379889022644, 4.547001272864449, 1.6436..."
1,262145,"[78, 3188, 4200, 4821, 5381, 5947, 8391, 8527,...","[3.9846936932633152, 8.111828078308406, 2.9151..."
2,262145,"[1846, 4106, 7917, 8287, 8630, 8769, 13677, 15...","[11.89024888444809, 6.812545094178145, 4.77962..."


In [None]:
# Initialize target dataframe
new_df = features_series.copy()

In [None]:
# Combine size, indices, and values into tuple containing mutil-dimensional array to match original format
new_df['features'] = list(zip(features_series['indices'], features_series['values']))

In [None]:
# Check it out yo
new_df.head()

Unnamed: 0,size,indices,values,features
0,262145,"[9521, 13381, 24113, 34146, 47205, 48870, 5265...","[13.146379889022644, 4.547001272864449, 1.6436...","([9521, 13381, 24113, 34146, 47205, 48870, 526..."
1,262145,"[78, 3188, 4200, 4821, 5381, 5947, 8391, 8527,...","[3.9846936932633152, 8.111828078308406, 2.9151...","([78, 3188, 4200, 4821, 5381, 5947, 8391, 8527..."
2,262145,"[1846, 4106, 7917, 8287, 8630, 8769, 13677, 15...","[11.89024888444809, 6.812545094178145, 4.77962...","([1846, 4106, 7917, 8287, 8630, 8769, 13677, 1..."
3,262145,"[8804, 19862, 30006, 40337, 46639, 47032, 5099...","[6.593874722676491, 5.036053097080879, 2.57975...","([8804, 19862, 30006, 40337, 46639, 47032, 509..."
4,262145,"[17141, 24145, 35715, 48549, 54961, 61231, 755...","[5.259196648395089, 3.550958151216895, 4.47424...","([17141, 24145, 35715, 48549, 54961, 61231, 75..."


In [None]:
# Drop redundant columns
new_df = new_df.drop(['indices', 'values'], axis=1)

In [None]:
# Inspect DataFrame
new_df.head()

Unnamed: 0,size,indices,values,features
0,262145,"[9521, 13381, 24113, 34146, 47205, 48870, 5265...","[13.146379889022644, 4.547001272864449, 1.6436...","([9521, 13381, 24113, 34146, 47205, 48870, 526..."
1,262145,"[78, 3188, 4200, 4821, 5381, 5947, 8391, 8527,...","[3.9846936932633152, 8.111828078308406, 2.9151...","([78, 3188, 4200, 4821, 5381, 5947, 8391, 8527..."
2,262145,"[1846, 4106, 7917, 8287, 8630, 8769, 13677, 15...","[11.89024888444809, 6.812545094178145, 4.77962...","([1846, 4106, 7917, 8287, 8630, 8769, 13677, 1..."
3,262145,"[8804, 19862, 30006, 40337, 46639, 47032, 5099...","[6.593874722676491, 5.036053097080879, 2.57975...","([8804, 19862, 30006, 40337, 46639, 47032, 509..."
4,262145,"[17141, 24145, 35715, 48549, 54961, 61231, 755...","[5.259196648395089, 3.550958151216895, 4.47424...","([17141, 24145, 35715, 48549, 54961, 61231, 75..."


In [None]:
# Start converting label data, again unzipping dictionary keys to columns and values as rows
labels_series = hope_this_works.stars_one_hot.apply(pd.Series)
labels_series.head()

Unnamed: 0,type,size,indices,values
0,0,5,[1],[1.0]
1,0,5,[0],[1.0]
2,0,5,[3],[1.0]
3,0,5,[1],[1.0]
4,0,5,[0],[1.0]


In [None]:
# Drope type since its not needed
labels_series = labels_series.drop('type', axis=1)
labels_series.head(3)

Unnamed: 0,size,indices,values
0,5,[1],[1.0]
1,5,[0],[1.0]
2,5,[3],[1.0]


In [None]:
# Initialize DataFrame
newer_df = labels_series.copy()

In [None]:
# Combine size, indices, and values into tuple containing mutil-dimensional array to match original format
newer_df['label_list'] = list(zip(labels_series['size'], labels_series['indices'], labels_series['values']))
newer_df.head(3)

Unnamed: 0,size,indices,values,label_list
0,5,[1],[1.0],"(5, [1], [1.0])"
1,5,[0],[1.0],"(5, [0], [1.0])"
2,5,[3],[1.0],"(5, [3], [1.0])"


In [None]:
# Drop redundant columns
newer_df = newer_df.drop(['size', 'indices', 'values'], axis=1)
newer_df.head(3)

Unnamed: 0,label_list
0,"(5, [1], [1.0])"
1,"(5, [0], [1.0])"
2,"(5, [3], [1.0])"


In [None]:
# Combine features and labels to DataFrame to be used in machine learning model
machine_df = pd.concat([new_df, newer_df, hope_this_works['stars']], axis=1)
machine_df.head()

Unnamed: 0,size,indices,values,features,size.1,indices.1,values.1,label_list,stars
0,262145,"[9521, 13381, 24113, 34146, 47205, 48870, 5265...","[13.146379889022644, 4.547001272864449, 1.6436...","([9521, 13381, 24113, 34146, 47205, 48870, 526...",5,[1],[1.0],"(5, [1], [1.0])",5
1,262145,"[78, 3188, 4200, 4821, 5381, 5947, 8391, 8527,...","[3.9846936932633152, 8.111828078308406, 2.9151...","([78, 3188, 4200, 4821, 5381, 5947, 8391, 8527...",5,[0],[1.0],"(5, [0], [1.0])",4
2,262145,"[1846, 4106, 7917, 8287, 8630, 8769, 13677, 15...","[11.89024888444809, 6.812545094178145, 4.77962...","([1846, 4106, 7917, 8287, 8630, 8769, 13677, 1...",5,[3],[1.0],"(5, [3], [1.0])",2
3,262145,"[8804, 19862, 30006, 40337, 46639, 47032, 5099...","[6.593874722676491, 5.036053097080879, 2.57975...","([8804, 19862, 30006, 40337, 46639, 47032, 509...",5,[1],[1.0],"(5, [1], [1.0])",5
4,262145,"[17141, 24145, 35715, 48549, 54961, 61231, 755...","[5.259196648395089, 3.550958151216895, 4.47424...","([17141, 24145, 35715, 48549, 54961, 61231, 75...",5,[0],[1.0],"(5, [0], [1.0])",4


In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix, accuracy_score, roc_auc_score, roc_curve

In [None]:
x=machine_df['indices']
y=machine_df['stars']
x_train,x_test,y_train,y_test = train_test_split(x,y,test_size=0.2,random_state=101)