## CS 297.2 Big Data Processing: Spark SQL and SparkML

Datasets used for this notebook can be found here: https://drive.google.com/drive/folders/1qYg9SXcc9minIErchqWXR9Rq4CYj7dqf

wyu@ateneo.edu

---

### Installing Spark on the machine

Once you have installed java, the next steps should be similar. You will likely want to put the Spark application folder wherever you put your user-installed applications.

In [1]:
!rm -r spark*
!wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!ls
!tar xvf ./spark-3.5.1-bin-hadoop3.tgz > /dev/null 2>/dev/null
!ls
!pip install -q findspark

rm: cannot remove 'spark*': No such file or directory
--2024-04-25 21:56:19--  https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400446614 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.1-bin-hadoop3.tgz’


2024-04-25 21:56:37 (21.7 MB/s) - ‘spark-3.5.1-bin-hadoop3.tgz’ saved [400446614/400446614]

sample_data  spark-3.5.1-bin-hadoop3.tgz
sample_data  spark-3.5.1-bin-hadoop3  spark-3.5.1-bin-hadoop3.tgz


In [2]:
# Set environment variables
import os
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3/"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-11-openjdk-amd64/jre/bin/java
!java -version

update-alternatives: error: alternative /usr/lib/jvm/java-11-openjdk-amd64/jre/bin/java for java not registered; not setting
openjdk version "11.0.22" 2024-01-16
OpenJDK Runtime Environment (build 11.0.22+7-post-Ubuntu-0ubuntu222.04.1)
OpenJDK 64-Bit Server VM (build 11.0.22+7-post-Ubuntu-0ubuntu222.04.1, mixed mode, sharing)


____
## Sample 1: Spark SQL examples

Using SQL in Spark

In [3]:
DATA_PATH = '/content/drive/MyDrive/Big Data/Input'

input_file1_path = 'irdata-v3.csv'
input_file2_path = 'cc.csv'

# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization and mount your drive.
drive.mount('/content/drive')
%cd /content/drive/MyDrive/Big Data


Mounted at /content/drive
/content/drive/MyDrive/Big Data


In [4]:
# If Spark is installed and SPARK_HOME is set, this will find the spark installation so spark libraries can be imported.
# findspark is necessary if you want to use Spark in the IDE of your choice.
import findspark
findspark.init()

# Imports the basic spark functions needed
from pyspark import SparkConf, SparkContext
from operator import add
import io

# Sets the Spark configuration. The AppName is arbitrary, but setting the master to local
# specifies that the application is not running on a distributed system
conf = SparkConf().setMaster("local").setAppName("SparkQLExample")
sc = SparkContext.getOrCreate(conf = conf)

In [5]:
# check if context if available. This might require some waiting
sc

In [6]:
from pyspark import SQLContext

# create spark sql context
sqlContext = SQLContext(sc)



In [7]:
# read data from filesystem
contraceptionData = sqlContext.read.csv(f"{DATA_PATH}/{input_file1_path}.gz", header = True, inferSchema = True).cache()

# did we get the data?
contraceptionData.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Residence Type: integer (nullable = true)
 |-- Religion: string (nullable = true)
 |-- Educ in single years: integer (nullable = true)
 |-- Encouraged FP: integer (nullable = true)
 |-- Used method in last 12 months: integer (nullable = true)
 |-- HH Head: integer (nullable = true)
 |-- Land Owner: integer (nullable = true)
 |-- Earns more: integer (nullable = true)
 |-- DM Contraception: integer (nullable = true)
 |-- DM Husband's Earnings: integer (nullable = true)
 |-- Depression/ anxiety: integer (nullable = true)
 |-- Total CEB: integer (nullable = true)
 |-- Number of SC: integer (nullable = true)



In [8]:
contraceptionData.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16155 entries, 0 to 16154
Data columns (total 14 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   Age                            16155 non-null  int32  
 1   Residence Type                 16155 non-null  int32  
 2   Religion                       16155 non-null  object 
 3   Educ in single years           16155 non-null  int32  
 4   Encouraged FP                  7372 non-null   float64
 5   Used method in last 12 months  3710 non-null   float64
 6   HH Head                        16155 non-null  int32  
 7   Land Owner                     16155 non-null  int32  
 8   Earns more                     5207 non-null   float64
 9   DM Contraception               5313 non-null   float64
 10  DM Husband's Earnings          9767 non-null   float64
 11  Depression/ anxiety            2164 non-null   float64
 12  Total CEB                      16155 non-null 

In [9]:
# is my data clean?
contraceptionData.toPandas()["Used method in last 12 months"].value_counts()

Used method in last 12 months
0.0    3009
1.0     665
9.0      36
Name: count, dtype: int64

In [10]:
# let us start cleaning it.
# give all null values a value of -1
from pyspark.sql.functions import col
contraceptionDataStringReplaced = contraceptionData.na.replace("", "-1")

# cast a string column to float
contraceptionDataCasted = contraceptionDataStringReplaced.withColumn("Religion", col("Religion").cast("float"))

# use 2 to mean no answer since the classification model only accepts labels from 0,1,9
notNull = contraceptionDataCasted.fillna({ 'Used method in last 12 months':2 })

# Other features will be relabeled to -1
df = notNull.fillna(-1)

In [11]:
# look at cleaned version
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16155 entries, 0 to 16154
Data columns (total 14 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   Age                            16155 non-null  int32  
 1   Residence Type                 16155 non-null  int32  
 2   Religion                       16155 non-null  float32
 3   Educ in single years           16155 non-null  int32  
 4   Encouraged FP                  16155 non-null  int32  
 5   Used method in last 12 months  16155 non-null  int32  
 6   HH Head                        16155 non-null  int32  
 7   Land Owner                     16155 non-null  int32  
 8   Earns more                     16155 non-null  int32  
 9   DM Contraception               16155 non-null  int32  
 10  DM Husband's Earnings          16155 non-null  int32  
 11  Depression/ anxiety            16155 non-null  int32  
 12  Total CEB                      16155 non-null 

In [12]:
df.toPandas()["Used method in last 12 months"].value_counts()


Used method in last 12 months
2    12445
0     3009
1      665
9       36
Name: count, dtype: int64

In [13]:
# show first few cleaned up rows
df.toPandas()

Unnamed: 0,Age,Residence Type,Religion,Educ in single years,Encouraged FP,Used method in last 12 months,HH Head,Land Owner,Earns more,DM Contraception,DM Husband's Earnings,Depression/ anxiety,Total CEB,Number of SC
0,19,0,1.0,10,-1,2,1,0,-1,-1,-1,-1,0,0
1,47,0,1.0,10,-1,0,0,0,-1,-1,1,-1,2,2
2,20,0,1.0,11,-1,1,0,0,0,1,1,-1,1,1
3,22,0,1.0,13,-1,2,0,0,-1,-1,-1,-1,1,1
4,34,0,1.0,8,-1,0,0,0,-1,-1,1,0,3,3
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
16150,33,1,0.0,16,0,2,0,0,1,1,1,-1,3,3
16151,24,1,0.0,10,0,1,0,0,-1,1,1,1,4,3
16152,42,1,0.0,10,0,2,0,0,-1,1,1,0,3,3
16153,37,1,0.0,8,0,1,0,0,-1,1,1,0,2,2


In [14]:
# select contents of one column
df.select("Land Owner").show()

+----------+
|Land Owner|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         1|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [15]:
# filter data
df.filter(df.Age > 30).show()

+---+--------------+--------+--------------------+-------------+-----------------------------+-------+----------+----------+----------------+---------------------+-------------------+---------+------------+
|Age|Residence Type|Religion|Educ in single years|Encouraged FP|Used method in last 12 months|HH Head|Land Owner|Earns more|DM Contraception|DM Husband's Earnings|Depression/ anxiety|Total CEB|Number of SC|
+---+--------------+--------+--------------------+-------------+-----------------------------+-------+----------+----------+----------------+---------------------+-------------------+---------+------------+
| 47|             0|     1.0|                  10|           -1|                            0|      0|         0|        -1|              -1|                    1|                 -1|        2|           2|
| 34|             0|     1.0|                   8|           -1|                            0|      0|         0|        -1|              -1|                    1|         

In [16]:
# groupby religion
df.select("Age","Residence Type","Religion").groupby("Religion").count().show()

+--------+-----+
|Religion|count|
+--------+-----+
|    99.0|   16|
|    -1.0|    4|
|     1.0|12078|
|     0.0| 4057|
+--------+-----+



In [17]:
# groupby method
df.select("Age","Residence Type","Used method in last 12 months").groupby("Used method in last 12 months").count().show()

+-----------------------------+-----+
|Used method in last 12 months|count|
+-----------------------------+-----+
|                            1|  665|
|                            9|   36|
|                            2|12445|
|                            0| 3009|
+-----------------------------+-----+



In [18]:
# groupby religion
df.select("Age","Residence Type","Religion").filter(df.Age > 30).groupby("Religion").count().show()

+--------+-----+
|Religion|count|
+--------+-----+
|    99.0|    7|
|    -1.0|    2|
|     1.0| 5677|
|     0.0| 1793|
+--------+-----+



In [19]:
# query as SQL
df.registerTempTable("loans")
sqlContext.sql("SELECT Religion, count(*) FROM loans WHERE Age > 30 GROUP BY Religion").show()




+--------+--------+
|Religion|count(1)|
+--------+--------+
|    99.0|       7|
|    -1.0|       2|
|     1.0|    5677|
|     0.0|    1793|
+--------+--------+



---
## Exercise: Studying input file details in SQL
This activity is to process the CC CSV file using SparkQL

1.   Load the file cc.csv in the SampleData directory
2.   Create a query using both data frame and SQL mechanism to show the number of entries by gender but only count the TRUE entries with amounts greater than $1000
3.   What do you think the cc.csv file represents? 



In [20]:
findspark.init()
from pyspark import SparkConf, SparkContext
from operator import add
import io
conf = SparkConf().setMaster("local").setAppName("Spark_CC")
sc = SparkContext.getOrCreate(conf = conf)
sqlContext = SQLContext(sc)
cc = sqlContext.read.csv(f"{DATA_PATH}/{input_file2_path}", header = True, inferSchema = True).cache()
cc.toPandas().info()



<class 'pandas.core.frame.DataFrame'>
RangeIndex: 250 entries, 0 to 249
Data columns (total 7 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   first   250 non-null    object
 1    last   250 non-null    object
 2   gender  250 non-null    object
 3   age     250 non-null    int32 
 4   city    250 non-null    object
 5   dollar  250 non-null    object
 6    bool   250 non-null    bool  
dtypes: bool(1), int32(1), object(5)
memory usage: 11.1+ KB


In [21]:
from pyspark.sql import functions as F

df = cc.select([F.col(col).alias(col.replace(' ', '')) for col in cc.columns])
df = df.withColumnRenamed('bool', 'bool_col')
df = df.withColumn('dollar', F.regexp_replace(F.col('dollar'), '[$,]', '').cast('double'))
df.limit(2).show()

df.toPandas()["gender"].value_counts()

+-------+-----+------+---+-------+-------+--------+
|  first| last|gender|age|   city| dollar|bool_col|
+-------+-----+------+---+-------+-------+--------+
| Elnora| Rowe|Female| 60|Vuskeso|5073.26|    true|
|Gregory|Hicks|Female| 42|Modfima|8973.87|   false|
+-------+-----+------+---+-------+-------+--------+



gender
Female    127
Male      123
Name: count, dtype: int64

1.

In [22]:
# show query using data frame mechanism
df.filter(df.bool_col == 'True').filter(df.dollar > 1000).groupby('gender').count().show()

+------+-----+
|gender|count|
+------+-----+
|Female|   54|
|  Male|   55|
+------+-----+



2.

In [23]:
# show query using SQL mechanism
df.createOrReplaceTempView("cc_sql")
sqlContext.sql("SELECT gender, COUNT(*) AS count FROM cc_sql WHERE bool_col = True AND dollar > 1000 GROUP BY gender").show()

+------+-----+
|gender|count|
+------+-----+
|Female|   54|
|  Male|   55|
+------+-----+



3.   What do you think the cc.csv file represents? </br> Answer : The dataset seems to represent a diverse group of individuals with varying ages, genders, locations, and monetary values associated with them. Additionally, the boolean value might suggest some sort of criteria or attribute each individual has, though without further context, it's hard to precisely determine what it represents.

In [24]:
# If Spark is installed and SPARK_HOME is set, this will find the spark installation so spark libraries can be imported.
# findspark is necessary if you want to use Spark in the IDE of your choice.
import findspark
findspark.init()

# Imports the basic spark functions needed
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkQLExample")
sc = SparkContext.getOrCreate(conf = conf)

from pyspark import SQLContext

# create spark sql context
sqlContext = SQLContext(sc)



____
## Sample 2: Lets jump to Machine Learning

Using ML in Spark

In [26]:
# create spark sql context
sqlContext = SQLContext(sc)



In [27]:
# import necessary libraries
from itertools import chain
import collections
import pandas as pd

# ML stuff
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col


In [28]:
contraceptionData = sqlContext.read.csv(f"{DATA_PATH}/{input_file1_path}.gz", header = True, inferSchema = True).cache()
# let us start cleaning it.
# give all null values a value of -1
contraceptionDataStringReplaced = contraceptionData.na.replace("", "-1")

# cast a string column to float
contraceptionDataCasted = contraceptionDataStringReplaced.withColumn("Religion", col("Religion").cast("float"))

# use 2 to mean no answer since the classification model only accepts labels from 0,1,9
notNull = contraceptionDataCasted.fillna({ 'Used method in last 12 months':2 })

# Other features will be relabeled to -1
df = notNull.fillna(-1)

In [29]:
df = df.select('Age', 'Residence Type', 'Religion', 'Educ in single years', 'Encouraged FP', 'HH Head', 'Land Owner', 'Earns more', 'DM Contraception', 'Depression/ anxiety', 'Total CEB', 'Number of SC', 'Used method in last 12 months')

In [30]:
df

DataFrame[Age: int, Residence Type: int, Religion: float, Educ in single years: int, Encouraged FP: int, HH Head: int, Land Owner: int, Earns more: int, DM Contraception: int, Depression/ anxiety: int, Total CEB: int, Number of SC: int, Used method in last 12 months: int]

In [31]:
# recap on what is in the schema
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16155 entries, 0 to 16154
Data columns (total 13 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   Age                            16155 non-null  int32  
 1   Residence Type                 16155 non-null  int32  
 2   Religion                       16155 non-null  float32
 3   Educ in single years           16155 non-null  int32  
 4   Encouraged FP                  16155 non-null  int32  
 5   HH Head                        16155 non-null  int32  
 6   Land Owner                     16155 non-null  int32  
 7   Earns more                     16155 non-null  int32  
 8   DM Contraception               16155 non-null  int32  
 9   Depression/ anxiety            16155 non-null  int32  
 10  Total CEB                      16155 non-null  int32  
 11  Number of SC                   16155 non-null  int32  
 12  Used method in last 12 months  16155 non-null 

In [32]:
# split data
train, test = df.randomSplit([0.7, 0.3], seed=30)
train.count()

11239

In [33]:
print(train.count())

11239


In [34]:
print(test.count())

4916


In [37]:
# assemble ML pipeline using LR
assembler = VectorAssembler(
    inputCols=['Age', 'Residence Type', 'Religion', 'Educ in single years', 'Encouraged FP', 'HH Head', 'Land Owner', 'Earns more', 'DM Contraception', 'Depression/ anxiety', 'Total CEB', 'Number of SC'],
    outputCol="features")
lr = LogisticRegression(featuresCol = 'features', labelCol = 'Used method in last 12 months', maxIter=10)
classificationPipeline = Pipeline(stages=[assembler, lr])
classificationPipelineModel = classificationPipeline.fit(train)
classified = classificationPipelineModel.transform(train)

# create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.25, 0.5, 1.0, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10, 15, 20])
             .build())
prediction = classificationPipelineModel.transform(test)

In [None]:
# evaluate model based on default parameters and start training!
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol = "Used method in last 12 months")
print('Accuracy before Cross Validation ', evaluator.evaluate(prediction))

cv = CrossValidator(estimator=classificationPipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(train)

Accuracy before Cross Validation  0.5469543503651267


In [None]:
# evaluate model after CV validation
predictions_cvModel = cvModel.transform(test)

print('Accuracy after Cross Validation: ', evaluator.evaluate(predictions_cvModel))

---
## Exercise: This is so slow why not run it in the cloud
Convert the SparkML script above into a EMR/Dataproc/HDInsight Spark job


In [36]:
# from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, FloatType

# from pyspark import SparkContext, SQLContext
# from pyspark.sql.types import *
# from pyspark.sql.functions import col, udf

# from itertools import chain
# import collections

# from pyspark.ml import Pipeline
# from pyspark.ml.classification import LogisticRegression
# from pyspark.ml.evaluation import BinaryClassificationEvaluator
# from pyspark.ml.feature import VectorAssembler
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# # create spark session
# spark = SparkSession.builder.master("local[*]").appName("SparkMLLRApp").getOrCreate()

# # read data from filesystem
# filename = "gs://csci273-bucket/sparkml/hw15/data/IR Data_Editted_ver3 - IR Data_Editted_ver3.csv"
# contraceptionData = spark.read.csv(filename, header = True, inferSchema = True).cache()

# # clean data
# contraceptionDataStringReplaced = contraceptionData.na.replace("", "-1")
# contraceptionDataCasted = contraceptionDataStringReplaced.withColumn("Religion", col("Religion").cast("float"))
# notNull = contraceptionDataCasted.fillna({ 'Used method in last 12 months':2 })
# df = notNull.fillna(-1)

# # split data
# train, test = df.randomSplit([0.7, 0.3], seed=30)

# # assemble ML pipeline using LR
# assembler = VectorAssembler(
#     inputCols=['Age', 'Residence Type', 'Religion', 'Educ in single years', 'Encouraged FP', 'HH Head', 'Land Owner', 'Earns more', 'DM Contraception', 'Depression/ anxiety', 'Total CEB', 'Number of SC'],
#     outputCol="features")
# model_lr = LogisticRegression(featuresCol = 'features', labelCol = 'Used method in last 12 months', maxIter=10)
# classificationPipeline = Pipeline(stages=[assembler, model_lr])
# classificationPipelineModel = classificationPipeline.fit(train)
# classified = classificationPipelineModel.transform(train)
# prediction = classificationPipelineModel.transform(test)

# # show pre-crossval accuracy
# evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol = "Used method in last 12 months")
# acc_before_cv = evaluator.evaluate(prediction)

# # Cross Validation
# paramGrid = (ParamGridBuilder()
#              .addGrid(model_lr.regParam, [0.01, 0.25, 0.5, 1.0, 2.0])
#              .addGrid(model_lr.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
#              .addGrid(model_lr.maxIter, [1, 5, 10, 15, 20])
#              .build())

# cv = CrossValidator(estimator=classificationPipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# cvModel = cv.fit(train)

# # show post crossval accuracy
# predictions_cvModel = cvModel.transform(test)
# acc_after_cv = evaluator.evaluate(predictions_cvModel)

# rdd = spark.sparkContext.parallelize([(acc_before_cv, acc_after_cv)])
# schema = StructType([StructField("acc_before_cv", FloatType(), True), StructField("acc_after_cv", FloatType(), True)])
# acc = spark.createDataFrame(rdd, schema=schema)

# acc.write.format("csv").option("header", "true").save("gs://csci273-bucket/sparkml/hw15/full/w2/output")