In [None]:
#Libraries
%pip install pyspark
%pip install pandas
%pip install pyarrow
%pip install grpcio
%pip install google
%pip install protobuf
%pip install grpcio_status
%pip install --upgrade pyspark-stubs # for IntelliSense

In [None]:
%pip list | grep pyspark

In [None]:
%pip list

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, lit, format_number
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import Imputer

### Spark connect
__Workloads are easier to maintain__  
Local library versions don't need to match the production servers. Essentialy, client query is converted into a unresolved logical plan which gets passed to spark server  

__Lets you build Spark Connect Clients in non-JVM languages__  
Decouples the client and the Spark Driver so that you can write a Spark Connect Client in any langugage. Examples
- Spark Connect Python
- Spark Connect Go
- Spark Connect Rust

__Allows for better remote development and testing__  
To work with Spark locally, you either need a local server or SSH connection to the remote Spark Driver. This removes the need for having SSH connection to remote Spark Driver, you can connect direction. More convenient and secure.  

In [None]:
# easier to maintain, your local library versions (spark,etc.) don't need to match to production servers
# essentialy client query is converted to a unresolved logical plan which gets passed to spark server

# interrupted after 5 mins, this never started...something to explore in future

# sparkConnect = SparkSession.builder\
#     .remote("sc://localhost")\
#     .appName("Practice")\
#     .getOrCreate()
# sparkConnect

#sparkConnect.stop()

In [None]:
spark = SparkSession.builder\
    .master("local")\
    .appName("Practice")\
    .getOrCreate()
spark

# High level data interactions  

Part 1
- reading csv
- types and schema
- select, describe, creating/renaming a column, dropping columns 

In [None]:
# reading a csv file
houseDf = spark.read\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .csv('data/train.csv')
houseDf.show()

In [None]:
type(houseDf)

In [None]:
houseDf.printSchema()

In [None]:
houseDf.head(3)

In [None]:
houseDf.select(['Id', 'MSSubClass']).show()

In [None]:
houseDf.dtypes

In [None]:
houseDf.describe().show()

In [None]:
houseDf = houseDf.withColumn('SalePriceIncreased', houseDf['SalePrice']+1000)
houseDf.show()

In [None]:
houseDf = houseDf.drop('SalePriceIncreased')
houseDf.show()

In [None]:
houseDf.withColumnRenamed('SalePrice', 'SalePriceNew').show()

Part 2 - Handling missing values
- dropping rows/columns
- parameters in dropping functionalities
- handling missing values

In [None]:
houseDf.show()

In [None]:
# before counting null values, we need to change 'NA' strings to null values
houseDf = houseDf.na.replace('NA', None)

In [None]:
def nullMetrics(df):
  # now let's count null values
  totalRows = df.count()

  # when function takes (condtion, value), if no value is passed, it'll be 1 or 0 based on condition, but with value passed, it'll return the value in column. 
  # So essentially difference between counting 1s or counting null values. returing value is preferred, in case aggregation type needs to change in future
  nullCounts = df.select([count(when(col(c).isNull() | isnan(c), c))
                                .alias(c) for c in df.columns])\
                                  .collect()[0] # bringing result from distributed DataFrame to the driver as a list of Row objects

  nullCountsDf = spark.createDataFrame([nullCounts])
  nullCountsDf.show()
  nullPercentDf = nullCountsDf.select(*[format_number((col(c)/lit(totalRows)),2).alias(c)
                                        for c in nullCountsDf.columns])
  nullPercentDf.show()

  return nullCountsDf, nullPercentDf

nullMetrics(houseDf)

In [None]:

print('total records:', houseDf.count())
print('dropping rows with any null values:', houseDf.na.drop().count())  
print('dropping rows with all null values:', houseDf.na.drop(how='all').count()) 
print('dropping rows with null values > than thresh:', houseDf.na.drop(thresh=2).count()) 

print('dropping rows with any null values in subset of columns', houseDf.na.drop(how='any', subset=['Alley']).count())

In [None]:
# filling the missing values
houseDf.na.fill('Missing Values').show()
houseDf.na.fill('Missing Values', ['Alley']).show()

In [None]:
houseDf.show()

In [None]:
houseDf.printSchema()

In [None]:
# first gotta have the right column type
houseDf = houseDf.withColumn("LotFrontage", col("LotFrontage").cast(IntegerType()))

In [None]:
# using Imputer function for filling in missing values
missingCols = ['LotFrontage']
imputer = Imputer(inputCols=missingCols,
                  outputCols=["{}_imputed".format(c) for c in missingCols])\
                  .setStrategy("mean")

houseDf = imputer.fit(houseDf).transform(houseDf)

In [None]:
nullMetrics(houseDf)