<a href="https://colab.research.google.com/github/mosesyhc/de300-wn2024-notes/blob/main/examples/ex-linear-mr-class.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Mounting Google drive for a permanent venv

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Retrieving Java, Spark, and `findspark` in Python

In [None]:
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [None]:
!pip install -q findspark
!pip install -q seaborn

In [None]:
# spark setup
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
# findspark helps locate the environment variables
import findspark
findspark.init()

# `diamonds` Dataset

In [None]:
import seaborn as sns
diam = sns.load_dataset('diamonds', cache=True, data_home='dataset/')

# Linear regression example

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:
diamonds = (
    spark.read.format('csv')
    .options(header='true', inferSchema='true')
    .load('dataset/diamonds.csv')
    .cache()
)

In [None]:
display(diamonds)

In [None]:
df = (
    diamonds
    .where(diamonds['price'] > 1000)
    .select(['cut', 'color', 'carat', 'clarity', 'price'])
)

In [None]:
df.show(5)

## Estimation

In [None]:
# specifying our predictors and response
predictors = ['cut', 'color', 'carat', 'clarity']
categorical = set(['cut', 'color', 'clarity'])
response = 'price'

In [None]:
# map functions

def xty_map(row):
  row = row.asDict()
  for i in predictors:
  # If the predictor is numerical, return the name as key and value
  # If the predictor is categorical, return the [name]_[value] as key and 1 as value
    pass
    yield ( )

def xtx_map(row):
  row = row.asDict()

  for i in predictors:
    pass
    for j in predictors:
      pass
      yield ( )

### Verifying results

In [None]:
row = df.take(1)[0]
row

In [None]:
[a for a in xty_map(row)]

In [None]:
[a for a in xtx_map(row)]

### Applying map and reduce

In [None]:
xtx_data = (df.rdd


            .collect())

In [None]:
xty_data = (df.rdd


            .collect())

In [None]:
xty_data

In [None]:
# collect the indices corresponding to X
index = dict(zip([r[0] for r in xty_data], range(len(xty_data))))
p = len(index)

index

In [None]:
# arrange the individual elements back into matrices
import numpy as np

XTY = np.zeros((p, 1))
for (k, v) in xty_data:
  XTY[index[k]] = v

XTX = np.zeros((p,p))
for ((k1,k2),v) in xtx_data:
  XTX[index[k1], index[k2]] = v

In [None]:
# solve for coefficients
beta =

In [None]:
beta

## Prediction

In [None]:
def predict(row):
  row = row.asDict()
  pred = 0.0
  for i in predictors:
    pass
  return float(pred)

In [None]:
rmse = np.sqrt(df.rdd



)

In [None]:
rmse

## Storing prediction with user-defined function

In [None]:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import FloatType

# A UDF must have a defined return type
predict_udf = udf(predict, FloatType())

# stores predictions in pred
df = df.withColumn("pred", predict_udf(struct(predictors)))

In [None]:
# stores residuals in resid
df = df.withColumn("resid", df['pred'] - df['price'])

In [None]:
sns.histplot(df.sample(False, 0.1).select('resid').toPandas())