# Agenda

>1. DBMS: Read and Write
>   * MongoDB
>   * PostgreSQL
>2. Preprocessing and Regression
>   * Initialize Spark with both Mongo and Postgre
>   * Prepare data
>   * Logistic Regression
>   * Linear Regression
>3. Further examples
>   * Pipeline 
>   * Word2Vec

# Libraries

In [None]:
# to create a spark session object
from pyspark.sql import SparkSession
# data types
from pyspark.sql.types import *

# DBMS: Read and Write

**Save Mods:**

 [<img src="images/_1.png" width=70%>](https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html)

## MongoDB

In [None]:
# Session with Mongo
spark_mongo = SparkSession \
    .builder \
    .appName("music_mongo") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/amazon.music") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/amazon.msample") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .getOrCreate()

In [None]:
spark_mongo

In [None]:
# Let's create an aggregation pipeline
pipeline = "[{$match: {'overall': {'$gt':1,'$lt':5}}}, {'$unset':['helpful', 'reviewTime', ]}]"

In [None]:
# load data from MongoDB
df = spark_mongo.read.format("mongo").option("pipeline", pipeline).load()

df.printSchema()

In [None]:
# get some stats
df.describe(['overall', 'unixReviewTime']).show()

In [None]:
# Get a sample
# sample(withReplacement=None, fraction=None, seed=None)
df=df.sample(0.01, 123)

In [None]:
# to tokenize
from pyspark.ml.feature import Tokenizer

# Tokenize text
tokenizer = Tokenizer(outputCol="tokenized", inputCol="reviewText")
df=tokenizer.transform(df)

In [None]:
# Let's save 
df.write.format("mongo").mode("append").save()

## PostgreSQL

In [None]:
# Open a session with Postgre
spark_postgre = SparkSession \
    .builder \
    .appName("tate_postgre") \
    .config("spark.jars", "/Users/matteodevigili/share/py4j/postgresql-42.7.3.jar") \
    .getOrCreate()

In [None]:
spark_postgre

 <img src="images/_0.png" width=90%>

In [None]:
# Read data from PostgreSQL running at localhost
params = spark_postgre.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5434/tate") \
    .option("user", "postgres") \
    .option("password", "smm695") \
    .option("driver", "org.postgresql.Driver")

df_0 = params \
    .option("dbtable", "artworks_id") \
    .load()

df_1 = params \
    .option("dbtable", "artworks") \
    .load()

df_2 = params \
    .option("dbtable", "artists") \
    .load()

df_3 = params \
    .option("dbtable", "roles") \
    .load()

df_0.printSchema()
df_1.printSchema()
df_2.printSchema()
df_3.printSchema()

In [None]:
# Create temporary tables
df_0.createOrReplaceTempView('artworks_id')
df_1.createOrReplaceTempView('artworks')
df_2.createOrReplaceTempView('artists')
df_3.createOrReplaceTempView('roles')


df = spark_postgre.sql("""
SELECT title, year, artist, artistrole
FROM artworks aws
JOIN artworks_id aid ON aid.accession_number = aws.accession_number
JOIN artists as ON as.artistid = aid.artistid
JOIN roles r ON r.role_id = aid.role_id
WHERE year IS NOT NULL""")

df.show()

In [None]:
from pyspark.ml.feature import StringIndexer

# indexer
indexer = StringIndexer() \
    .setInputCol("artistrole") \
    .setOutputCol("artistrole_index")

# fit the indexer
fitted = indexer.fit(df)

# modify data
df = fitted.transform(df)

# show five
df.take(5)

In [None]:
# save the table
mode = "overwrite"
url = "jdbc:postgresql://localhost:5432/tate"
properties = {"user": "postgres","password": "smm695","driver": "org.postgresql.Driver"}

df.write.jdbc(url=url, table="test", mode=mode, properties=properties)

**Alternative script**
```python
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/tate") \
    .option("dbtable", "test") \
    .option("user", "dms695") \
    .option("password", "smm695") \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()
```

**Check on psql or PgAdmin**

```sql
SELECT DISTINCT artistrole, artistrole_index FROM test 
ORDER BY artistrole_index;
```

_Expected result:_

|artistrole	| artistrole_index |
| --- | --- |
|artist	| 0 |
|after	| 1 |
|attributed to	| 2 |
|prints after	| 3 |
|formerly attributed to	| 4 | 
|manner of	| 5 |
|and assistants	| 6 |
|pupil of	| 7 |
|and other artists	| 8 |
|circle of	| 9 |
|follower of	| 10 |
|imitator of	| 11 |
|studio of	| 12| 

# Preprocessing and Regression

In [None]:
# to create a spark session object
from pyspark.sql import SparkSession

# data types
from pyspark.sql.types import *

## Set-up

In [None]:
# Open a session
spark_session = SparkSession \
    .builder \
    .appName("analysis") \
    .config("spark.jars", "/Users/matteodevigili/share/py4j/postgresql-42.7.3.jar") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .getOrCreate()

In [None]:
spark_session

In [None]:
# import SparkFiles
from pyspark import SparkFiles

# target dataset
url = 'https://raw.githubusercontent.com/fivethirtyeight/data/master/bechdel/movies.csv'

# loading data with pyspark
spark_session.sparkContext.addFile(url)
df = spark_session.read.csv(SparkFiles.get('movies.csv'), header=True, inferSchema=True)

# let's print the schema
df.printSchema()

## Preprocessing data

In [None]:
# pyspark rename 'budget_2013$'
df=df.withColumnRenamed('budget_2013$', 'budget_2013')

In [None]:
df.describe(['year', 'clean_test', 'binary', 'budget_2013', 'domgross_2013$', 'intgross_2013$']).show()

### Change data-type

In [None]:
# Cast values to int
df = df.withColumn("domgross_2013", df["domgross_2013$"].cast(IntegerType()))
df = df.withColumn("intgross_2013", df["intgross_2013$"].cast(IntegerType()))

### Drop null values

In [None]:
# drop null values
df=df.na.drop("any")

In [None]:
# let's inspect data
hist = df.select(['year','budget_2013', "domgross_2013", "intgross_2013"
                  ]) \
         .sample(False, 0.5, 123) \
         .toPandas() \
         .hist(bins=20,figsize=(12, 8))

### Categorical Features

In [None]:
# Let's inspect string data
df.groupby('clean_test').count().show()
df.groupby('binary').count().show()

In [None]:
# Applying some transformations: String indexer
from pyspark.ml.feature import StringIndexer

# Indexing 'clean_test'
idx_0 = StringIndexer().setInputCol("clean_test").setOutputCol("clean_test_idx")

# Indexing 'binary'
idx_1 = StringIndexer() \
        .setInputCol("binary") \
        .setOutputCol("binary_idx")

# Applying to df
df = idx_0.fit(df).transform(df)
df = idx_1.fit(df).transform(df)

# Inspect result
df.select(['binary', 'binary_idx','clean_test', 'clean_test_idx']).show()

In [None]:
# Step two: One-Hot Encoding
from pyspark.ml.feature import OneHotEncoder

# Encoding 'clean_test'
ohe_0 = OneHotEncoder().setInputCol("clean_test_idx").setOutputCol("clean_test_ohe")

# Encoding 'binary'
ohe_1 = OneHotEncoder().setInputCol("binary_idx").setOutputCol("binary_ohe")

# Let's show what we have
ohe_0.fit(df).transform(df).select(['clean_test', 'clean_test_idx', 'clean_test_ohe']).show()
ohe_1.fit(df).transform(df).select(['binary', 'binary_idx', 'binary_ohe']).show()

# Apply ohe_1 to df
df = ohe_1.fit(df).transform(df)

### Assembling Vectors

In [None]:
from pyspark.ml.feature import VectorAssembler

# Assembling a Vector for Logistic Regression
v_0 = VectorAssembler() \
     .setInputCols(["year", "budget_2013", "domgross_2013", "intgross_2013"]) \
     .setOutputCol('features_0')

# Assembling a Vector for Linear Regression
v_1 = VectorAssembler() \
     .setInputCols(["year", "budget_2013", "domgross_2013", "binary_ohe"]) \
     .setOutputCol('features_1')

# Applying
df = v_0.transform(df)
df = v_1.transform(df)

# show
df.select(['features_0', 'features_1']).show()

### Splitting train and test

In [None]:
# splitting training and test
train, test = df.randomSplit([0.7, 0.3])

## Estimation

### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

# instance of Logistic Regression
lr_0 = LogisticRegression(labelCol="binary_idx",featuresCol="features_0")

# let's inspect the parameters
a = lr_0.explainParams().split('\n')
x = 1
for i in a:
    b = i.split(':',1)
    b_0,b_1 = '\033[1m' + b[0] + '\033[0m', b[1]
    print("""{}. {} : {}
    """.format(x, b_0,b_1), flush=True)
    x=x+1

In [None]:
# let's fit data
fitLr_0 = lr_0.fit(train)

# Print the coefficients and intercept
print(""" 

Coefficients:
============
{}


Intercept:
=========
{}

""".format(fitLr_0.coefficients, fitLr_0.intercept), flush=True)

# comparing binary and prediction
fitLr_0.transform(train).select("binary_idx", "prediction").show()

In [None]:
# get some more info
s_0 = fitLr_0.summary

# Obtain the objective per iteration
objectiveHistory = s_0.objectiveHistory

print("""
- Accuracy: {}
- Area Under ROC : {}
- False Positive Rate by Label: {}
- Precision by Label: {}
- Tot. Iterations: {}
- Objective History: 
{}
""".format(s_0.accuracy, s_0.areaUnderROC,
           s_0.falsePositiveRateByLabel, s_0.precisionByLabel,
           s_0.totalIterations, [obj for obj in objectiveHistory]),
      flush=True)

### Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression

# instance of Linear Regression
lr_1 = LinearRegression(labelCol="intgross_2013",featuresCol="features_1")

# let's inspect parameters
a = lr_1.explainParams().split('\n')
x = 1
for i in a:
    b = i.split(':',1)
    b_0,b_1 = '\033[1m' + b[0] + '\033[0m', b[1]
    print("""{}. {} : {}
    """.format(x, b_0,b_1), flush=True)
    x=x+1

# fit
fitLr_1 = lr_1.fit(train)

In [None]:
# summary stats
s_1 = fitLr_1.summary
print("""
- R^2 adj: {}
- RMSE : {}
- Intercept: {}
- Coefficients: {}
- p-values: {}
""".format(s_1.r2adj, s_1.rootMeanSquaredError, fitLr_1.intercept, fitLr_1.coefficients, s_1.pValues),
      flush=True)

In [None]:
# let's create a reference dataframe
results = fitLr_1.transform(train).select(['intgross_2013', 'prediction'])
results.createOrReplaceTempView('results')
spark_session.sql(
    "SELECT format_number(intgross_2013,2) as intgross, format_number(prediction,2) as prediction FROM results"
).show()

## Save Data

In [None]:
# save to Postgre
results.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/smm695") \
    .option("dbtable", "results") \
    .option("user", "postgres") \
    .option("password", "smm695") \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [None]:
# save to MongoDB
results.write \
        .option("spark.mongodb.output.uri", "mongodb://127.0.0.1/pyspark.results") \
        .format("mongo") \
        .mode("overwrite") \
        .save()

# Further Examples 

## Pipeline

In [None]:
from pyspark.ml.feature import RFormula
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import TrainValidationSplit

In [None]:
# R Formula to perform simple preprocessing
rf = RFormula()

# logistic regression
lr = LogisticRegression().setLabelCol("binary_idx").setFeaturesCol("features")

# let's create a pipeline
pipeline = Pipeline().setStages([rf, lr])

# setting some parameters
params = ParamGridBuilder()\
         .addGrid(rf.formula, 
                  ["binary_idx ~ budget_2013",
                   "binary_idx ~ year + budget_2013", 
                   "binary_idx ~ year + budget_2013 + domgross_2013 + intgross_2013"]) \
         .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
         .addGrid(lr.regParam, [0.0, 0.01, 0.1]) \
         .build()

# evaluation
evaluator = BinaryClassificationEvaluator() \
            .setMetricName("areaUnderROC") \
            .setRawPredictionCol("prediction") \
            .setLabelCol("label")

# hyperparameter tuning
tvs = TrainValidationSplit() \
      .setTrainRatio(0.75) \
      .setEstimatorParamMaps(params) \
      .setEstimator(pipeline) \
      .setEvaluator(evaluator)

In [None]:
# fit 
tvsFitted = tvs.fit(train)

In [None]:
# get info
l = []
x = 1
for i in tvsFitted.getEstimatorParamMaps():
    for a in i.keys():
        l.append(a)
    b, c, d = i.get(l[0]), i.get(l[1]), i.get(l[2])
    print("""
    {}. \033[1mModel\033[0m: {} \033[1mElasticNet\033[0m: {} \033[1mRegular.\033[0m: {}""".format(x, b, c, d))
    x = x+1

In [None]:
# best model info
bs_0 = tvsFitted.bestModel.stages[0].extractParamMap()
bs_1 = tvsFitted.bestModel.stages[1].extractParamMap()

l = []
m = []
for i in bs_0.keys():
    l.append(i)
for i in bs_1.keys():
    m.append(i)
        
print("""
Model: {} 

ElasticNet: {} 

Regularization: {}
""".format(bs_0.get(l[-1]), bs_1.get(m[1]), bs_1.get(m[-4])), flush=True)

In [None]:
# Best Model
s = tvsFitted.bestModel.stages[1].summary

# Obtain the objective per iteration
objectiveHistory = s.objectiveHistory

print("""
- Accuracy: {}
- Area Under ROC : {}
- False Positive Rate by Label: {}
- Precision by Label: {}
- Tot. Iterations: {}
- Objective History: 
{}
""".format(s.accuracy, s.areaUnderROC,
           s.falsePositiveRateByLabel, s.precisionByLabel,
           s.totalIterations, [obj for obj in objectiveHistory]),
      flush=True)

In [None]:
# let's see how it performs on the test
evaluator.evaluate(tvsFitted.transform(test))

## Word2Vec

In [None]:
# libraries
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import format_number as fmt

In [None]:
# let's reload the music collection
df = spark_session \
    .read \
    .format("mongo") \
    .option("uri","mongodb://127.0.0.1/amazon.music")\
    .load()

In [None]:
# get a sample
df=df.sample(False, 0.001, 123)
# count
df.count()

In [None]:
# drop null values
df=df.na.drop(subset=["reviewText"])

In [None]:
# Tokenize text
tokenizer = RegexTokenizer(outputCol="DOC_TOKEN", inputCol="reviewText", toLowercase=True, pattern="\\W")
df=tokenizer.transform(df)

**Expected outcome:**

```python
print(df.first()['reviewText'][0:503], '\n')
print(*df.first()['DOC_TOKEN'][0:100], sep = ", ")
```

>Instead of The Doors Collections this set should have been called 3 for 1, a joking reference to the song 5 to 1. This DVD is three previously released videos from the 80's and early 90's, Dance On Fire, Live At The Hollywood Bowl, and The Soft Parade.Dance On Fire is a slow start for the DVD, it is formatted like a 60's era LP, a collection of songs strung together that may or may not have a connection to each other. Jim Morrison once suggested that between tracks on a Doors album they should put  
>
>instead, of, the, doors, collections, this, set, should, have, been, called, 3, for, 1, a, joking, reference, to, the, song, 5, to, 1, this, dvd, is, three, previously, released, videos, from, the, 80, s, and, early, 90, s, dance, on, fire, live, at, the, hollywood, bowl, and, the, soft, parade, dance, on, fire, is, a, slow, start, for, the, dvd, it, is, formatted, like, a, 60, s, era, lp, a, collection, of, songs, strung, together, that, may, or, may, not, have, a, connection, to, each, other, jim, morrison, once, suggested, that, between, tracks, on, a, doors, album, they, should, put

**You can also remove stopwords:**

```python
# let's remove stopwords
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="DOC_TOKEN", outputCol="DOC_STOP")
df=remover.transform(df)

# let's inspect one review 
print(*df.first()['DOC_STOP'][0:100], sep = ", ")
```
> instead, doors, collections, set, called, 3, 1, joking, reference, song, 5, 1, dvd, three, previously, released, videos, 80, early, 90, dance, fire, live, hollywood, bowl, soft, parade, dance, fire, slow, start, dvd, formatted, like, 60, era, lp, collection, songs, strung, together, may, may, connection, jim, morrison, suggested, tracks, doors, album, put, poems, exactly, happens, video, poem, morrison, video, rare, rarely, seen, elektra, promotional, film, break, others, created, 80, directed, ray, manzarek, specific, intention, video, shown, mtv, wild, child, surely, centerpiece, meant, l, woman, l, woman, almost, entirely, shot, ray, without, vintage, doors, footage, mini, drama, prostitute, serial, killer, john, doe

In [None]:
# parameters
word2Vec = Word2Vec(vectorSize=100, 
                    seed=123, 
                    maxIter=10, 
                    inputCol="DOC_TOKEN", 
                    outputCol="model")

# fit the model
model = word2Vec.fit(df)

In [None]:
# let's see the vectors
model.getVectors().show()

In [None]:
# let's inspect some synonyms
model.findSynonyms("album", 5).select("word", fmt("similarity", 5).alias("similarity")).show()