DATA INGESTION and Cleaning

## Convert Date column to proper format
Use PySpark SQL functions to modularly change all existing tables to the correct date format. Had to be done because SQLSpark doesn't allow UPDATEs.

In [0]:
from pyspark.sql.functions import col, to_date, date_format

# Store Spark tables directly in the dictionary to avoid issues with scope
dataframes = {
    'amazon': spark.table('financial_sectors.default.amazon'),
    'tesla': spark.table('financial_sectors.default.tesla'),
    'apple': spark.table('financial_sectors.default.apple'),
    'microsoft': spark.table('financial_sectors.default.microsoft'),
    'eli_lily': spark.table('financial_sectors.default.eli_lily'),
    'united_health': spark.table('financial_sectors.default.united_health'),
    'berkshire_b': spark.table('financial_sectors.default.berkshire_b'),
    'jp_morgan': spark.table('financial_sectors.default.jp_morgan'),
}

# function to format date and change type
def normalize_date(column_name,data_frame):
        if column_name in data_frame.columns:
                # format uniform date column
                data_frame = data_frame.withColumn(column_name, date_format(to_date(col(column_name),'M/d/yyyy'),'MM/dd/yyyy'))
                # change data type back to date
                data_frame = data_frame.withColumn(column_name,to_date(column_name,'MM/dd/yyyy'))
                return  data_frame

for key in dataframes.keys():
        print(f"normalizing date for {key} dataframe")
        dataframes[key] = normalize_date('date',dataframes[key])
        # dataframes[key].show()
        # dataframes[key].printSchema()


normalizing date for amazon dataframe
+----------+------+------+------+------+---------+
|      date|  open|  high|   low| close|   volume|
+----------+------+------+------+------+---------+
|1997-05-16| 0.093|0.0987|0.0852|0.0863|294000000|
|1997-05-19|0.0852|0.0883|0.0811|0.0852|122136000|
|1997-05-20|0.0863|0.0873|0.0816|0.0816|109344000|
|1997-05-21|  0.08|0.0821|0.0686|0.0712|377064000|
|1997-05-22|0.0717|0.0722|0.0655|0.0696|235536000|
|1997-05-23|0.0702|0.0759|0.0665|0.0748|318744000|
|1997-05-27|0.0738|0.0821|0.0727| 0.079|173952000|
|1997-05-28|0.0803|0.0816|0.0764|0.0764| 91488000|
|1997-05-29|0.0769|0.0769|0.0738|0.0751| 69456000|
|1997-05-30|0.0748|0.0754|0.0738|0.0748| 51888000|
|1997-06-02|0.0754|0.0764|0.0748|0.0754| 11832000|
|1997-06-03|0.0764|0.0764|0.0738|0.0738| 23664000|
|1997-06-04|0.0738|0.0743|0.0696|0.0707| 61608000|
|1997-06-05|0.0707|0.0769|0.0686|0.0769|113448000|
|1997-06-06|0.0754|0.0852|0.0754|0.0826|156144000|
|1997-06-09|0.0842|0.0852|0.0826|0.0842| 470

# Additional cleaning
Check for and remove null values


In [0]:
%python
# from pyspark.sql.functions import col, sum

null_counts = dataframes['amazon'].select(
    [sum(col(c).isNull().cast("int")).alias(c) for c in dataframes['amazon'].columns]
)
display(null_counts)
# taken from the assistant just because it worked right away.
# col(c): This function creates a column object for the column name c.
# col(c).isNull(): This checks if the column value is null.
# col(c).isNull().cast("int"): This casts the boolean result of isNull() to an integer (1 for null, 0 for non-null).
# sum(col(c).isNull().cast("int")): This sums up the integer values, effectively counting the number of nulls in the column.
# .alias(c): This gives the resulting column the same name as the original column.



date,open,high,low,close,volume
0,0,0,0,0,0


### DataFrame is ready to overwrite existing tables

In [0]:
for key in dataframes.keys():
    dataframes[key].write.mode('overwrite').saveAsTable(f'{key}_cleaned')

###Initial Ingestion Complete