In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, col, split
from pyspark.ml.feature import Imputer, PCA
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum as spark_sum, pow, row_number, monotonically_increasing_id, when, lit
from pyspark.sql.functions import col, sum as spark_sum, pow, row_number, monotonically_increasing_id, when, lit, lead, isnan
from pyspark.sql.functions import col, mean, stddev
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum as spark_sum, pow, row_number, monotonically_increasing_id, when, lit
import numpy as np
import pandas as pd

In [0]:
csv_directory_path = "dbfs:/FileStore/tables"
files = spark.sparkContext.wholeTextFiles(csv_directory_path).keys().collect()
csv_files = [file for file in files if file.endswith(".csv")]
# for file_name in csv_files:
#     dbutils.fs.rm(file_name)
# %fs rm dbfs:/FileStore/*.csv

In [0]:
spark = SparkSession.builder.appName("AirML").getOrCreate()

num_csv_files = len(csv_files)
print(f"Number of CSV files in the directory: {num_csv_files}")

Number of CSV files in the directory: 300


In [0]:
# print(csv_files)
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv_files)
    # .option("mergeSchema", "true") \
# print(df.count())
df = df.select("LONGITUDE", "LATITUDE", "ELEVATION", "WND", "CIG", "VIS", "TMP", "DEW", "SLP")
sampled_rows = df.rdd.takeSample(False, 45000, seed=177)
df = spark.createDataFrame(sampled_rows, df.schema)
df.persist()
print(df.count())
# df.show(3)

45000


In [0]:
from pyspark.sql.functions import substring_index, split, col

df = df.withColumn("WND_DIR", split(col("WND"), ",").getItem(0))
df = df.withColumn("WND_TYPE", split(col("WND"), ",").getItem(2))
df = df.withColumn("WND_SPD", split(col("WND"), ",").getItem(3))
df = df.withColumn("CEL_H", split(col("CIG"), ",").getItem(0))
df = df.withColumn("VIS_DIST", split(col("VIS"), ",").getItem(0))
df = df.withColumn("VIS_VAR", split(col("VIS"), ",").getItem(2))
df = df.withColumn("AIR_TMP", split(col("TMP"), ",").getItem(0))
df = df.withColumn("DEW_TMP", split(col("DEW"), ",").getItem(0))
df = df.withColumn("PRESSURE", split(col("SLP"), ",").getItem(0))
df = df.select("LONGITUDE", "LATITUDE", "ELEVATION", "WND_DIR", "WND_TYPE", "WND_SPD", "CEL_H", "VIS_DIST", "VIS_VAR", "AIR_TMP", "DEW_TMP", "PRESSURE")
# df = df.select("LONGITUDE", "LATITUDE", "ELEVATION", "WND_DIR", "WND_SPD", "CEL_H", "VIS_DIST", "AIR_TMP", "DEW_TMP", "PRESSURE")

In [0]:
numeric_cols = ["LONGITUDE", "LATITUDE", "ELEVATION", "WND_DIR", "WND_SPD", "CEL_H", "VIS_DIST", "AIR_TMP", "DEW_TMP"]
# categorical_cols = []
categorical_cols = ["WND_TYPE", "VIS_VAR"]
all_cols = numeric_cols + categorical_cols

In [0]:
# Do not execute the above code unless you want to deal with categorical vars
indexers = [StringIndexer(inputCol=col, outputCol=col + "_Index") for col in categorical_cols]

encoders = [OneHotEncoder(inputCol=col + "_Index", outputCol=col + "_ONE") for col in categorical_cols]

pipeline_stages = indexers + encoders 
pipeline = Pipeline(stages=pipeline_stages)

df = pipeline.fit(df).transform(df)
df = df.drop("WND_TYPE")
df = df.drop("VIS_VAR")
df = df.drop("WND_TYPE_Index")
df = df.drop("VIS_VAR_Index")
df = df.withColumnRenamed("VIS_VAR_ONE", "VIS_VAR").withColumnRenamed("WND_TYPE_ONE", "WND_TYPE")

Downloading artifacts:   0%|          | 0/30 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
df = df.select("LONGITUDE", "LATITUDE", "ELEVATION", "WND_DIR", "WND_TYPE", "WND_SPD", "CEL_H", "VIS_DIST", "VIS_VAR", "AIR_TMP", "DEW_TMP", "PRESSURE")
# df = df.select("LONGITUDE", "LATITUDE", "ELEVATION", "WND_DIR", "WND_SPD", "CEL_H", "VIS_DIST", "AIR_TMP", "DEW_TMP", "PRESSURE")
df.show(2)
print(df.count())

+---------+--------+---------+-------+-------------+-------+-----+--------+-------------+-------+-------+--------+
|LONGITUDE|LATITUDE|ELEVATION|WND_DIR|     WND_TYPE|WND_SPD|CEL_H|VIS_DIST|      VIS_VAR|AIR_TMP|DEW_TMP|PRESSURE|
+---------+--------+---------+-------+-------------+-------+-----+--------+-------------+-------+-------+--------+
|-78.93818| 43.1083|    178.3|    110|(5,[0],[1.0])|   0015|00762|  016093|(2,[1],[1.0])|  +0228|  +0206|   99999|
|-104.7552| 40.8066|   1642.9|    999|(5,[1],[1.0])|   9999|99999|  999999|(2,[0],[1.0])|  +0041|  +9999|   99999|
+---------+--------+---------+-------+-------------+-------+-----+--------+-------------+-------+-------+--------+
only showing top 2 rows

45000


In [0]:
# Cannot predict if the response variable is missing
import numpy as np
from pyspark.sql.functions import when

df = df.withColumn("LONGITUDE", when(col("LONGITUDE") == 999999, np.nan).otherwise(col("LONGITUDE"))) \
       .withColumn("LATITUDE", when(col("LATITUDE") == 99999, np.nan).otherwise(col("LATITUDE"))) \
       .withColumn("ELEVATION", when(col("ELEVATION") == 9999, np.nan).otherwise(col("ELEVATION"))) \
       .withColumn("WND_DIR", when(col("WND_DIR") == 999, np.nan).otherwise(col("WND_DIR"))) \
       .withColumn("WND_SPD", when(col("WND_SPD") == 9999, np.nan).otherwise(col("WND_SPD"))) \
       .withColumn("CEL_H", when(col("CEL_H") == 99999, np.nan).otherwise(col("CEL_H"))) \
       .withColumn("VIS_DIST", when(col("VIS_DIST") == 999999, np.nan).otherwise(col("VIS_DIST"))) \
       .withColumn("AIR_TMP", when(col("AIR_TMP") == 9999, np.nan).otherwise(col("AIR_TMP"))) \
       .withColumn("DEW_TMP", when(col("DEW_TMP") == 9999, np.nan).otherwise(col("DEW_TMP")))

df = df.filter(df.PRESSURE != 99999)

print(df.count())
df.show(3)

17961
+-----------+----------+---------+-------+-------------+-------+-----+--------+-------------+-------+-------+--------+
|  LONGITUDE|  LATITUDE|ELEVATION|WND_DIR|     WND_TYPE|WND_SPD|CEL_H|VIS_DIST|      VIS_VAR|AIR_TMP|DEW_TMP|PRESSURE|
+-----------+----------+---------+-------+-------------+-------+-----+--------+-------------+-------+-------+--------+
|-36.6166666|-9.4166666|    276.5|    360|(5,[0],[1.0])|   0018|  NaN|     NaN|(2,[0],[1.0])|  +0222|    NaN|   10176|
|158.2166667| 6.9666667|     39.0|    120|(5,[0],[1.0])|   0015|09144|  024140|(2,[0],[1.0])|  +0232|  +0229|   10113|
| 46.5833333|56.3333333|    107.0|    200|(5,[0],[1.0])|   0010|22000|  010000|(2,[0],[1.0])|  +0203|  +0159|   10124|
+-----------+----------+---------+-------+-------------+-------+-----+--------+-------------+-------+-------+--------+
only showing top 3 rows



In [0]:
# df.persist()
oodf = df
oodf.persist()

DataFrame[LONGITUDE: string, LATITUDE: string, ELEVATION: string, WND_DIR: string, WND_TYPE: vector, WND_SPD: string, CEL_H: string, VIS_DIST: string, VIS_VAR: vector, AIR_TMP: string, DEW_TMP: string, PRESSURE: string]

In [0]:
# assembler_numeric = VectorAssembler(inputCols=numeric_cols, outputCol="numerical_features")
# assembler_all = VectorAssembler(inputCols=["scaled_numerical_features"] + ["WND_TYPE", "VIS_VAR"], outputCol="features")
# pipeline = Pipeline(stages=[assembler_numeric, assembler_all])
# df = pipeline.fit(df).transform(df)

In [0]:
print(oodf.count())

num_df = oodf.select(*numeric_cols, "PRESSURE")
cat_df = oodf.select(*categorical_cols)

for col_name in numeric_cols + ["PRESSURE"]:
    num_df = num_df.withColumn(col_name, 
                       when(~isnan(col(col_name)), col(col_name).cast(DoubleType()))
                       .otherwise(np.nan))
num_df = reconstruct_missing_vals(num_df)

num_df = add_index(num_df)
cat_df = add_index(cat_df)
joined_df = num_df.join(cat_df, on="index")
df = joined_df.select(*all_cols, "PRESSURE")
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df = assembler.transform(df)
from pyspark.storagelevel import StorageLevel
# df.persist(StorageLevel.MEMORY_ONLY)
df.persist()
df.show(3)
print(df.count())

17961
+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------+-------------+--------+--------------------+
|          LONGITUDE|           LATITUDE|           ELEVATION|            WND_DIR|            WND_SPD|              CEL_H|           VIS_DIST|           AIR_TMP|            DEW_TMP|     WND_TYPE|      VIS_VAR|PRESSURE|            features|
+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------+-------------+--------+--------------------+
|-0.3499966248135452| -1.146269281536541|-0.02330227515780...| 1.8167372340807275|-0.5581181642885598| 1.6315420631295205|  1.132230272540409|0.7089061816297165|0.09084793007516523|(5,[0],[1.0])|(2,[0],[1.0])| 10176.0|[-0.3499966248135...|
| 1.7795548730154005|-0.6469847598

In [0]:
models = [("Linear Regression", LinearRegression(featuresCol="features", labelCol="PRESSURE")), ("Gradient Boosted Trees", GBTRegressor(featuresCol="features", labelCol="PRESSURE",
                   maxDepth=7, maxIter=100, stepSize=0.1, subsamplingRate=0.75))]
# models = [LinearRegression(featuresCol="features", labelCol="PRESSURE")]
for name, model in models:
    pipeline = Pipeline(stages=[model])

    train_data, test_data = df.randomSplit([0.7, 0.3], seed=1777)

    predictions = pipeline.fit(train_data).transform(test_data)

    evaluator = RegressionEvaluator(labelCol="PRESSURE", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)

    print(f"Root Mean Squared Error (RMSE) on test data for {name} = {rmse}")

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Root Mean Squared Error (RMSE) on test data for Linear Regression = 89.39472338690288


Downloading artifacts:   0%|          | 0/14 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

Root Mean Squared Error (RMSE) on test data for Gradient Boosted Trees = 78.80678487809404


In [0]:
def add_index(df):
    window_spec = Window.orderBy(lit(1))
    
    df_with_index = df.withColumn("index", row_number().over(window_spec) - 1)
    return df_with_index

In [0]:
data = [
    (1, 1),
    (2, 1),
    (2, 1),
    (4, 2),
    (4, 2),
    (4, 2),
    (5, 8),
    (8, 8),
    (9, 8),
]
columns = ["A", "Y"]

df = spark.createDataFrame(data, columns)

window_spec_index = Window.orderBy("A")
df_with_index = df.withColumn("index", row_number().over(window_spec_index))
N = df_with_index.count()  
df_with_index = df_with_index.withColumn(
    "remaining",
    when((lit(N) - col("index")) == 0, 1).otherwise(lit(N) - col("index"))
)

df_sorted = df_with_index.orderBy(col("A"))

window_spec_prefix = Window.orderBy("A").rowsBetween(Window.unboundedPreceding, Window.currentRow)
# window_spec_suffix = Window.orderBy("A").rowsBetween(Window.currentRow, Window.unboundedFollowing)

df_with_sums = df_sorted.withColumn("prefix_sum_Y", spark_sum(col("Y")).over(window_spec_prefix)) \
                        .withColumn("prefix_squared_sum_Y", spark_sum(pow(col("Y"), 2)).over(window_spec_prefix)) \
                        # .withColumn("suffix_sum_Y", spark_sum(col("Y")).over(window_spec_suffix))

df_with_sums.show()

+---+---+-----+---------+------------+--------------------+
|  A|  Y|index|remaining|prefix_sum_Y|prefix_squared_sum_Y|
+---+---+-----+---------+------------+--------------------+
|  1|  1|    1|        8|           1|                 1.0|
|  2|  1|    2|        7|           2|                 2.0|
|  2|  1|    3|        6|           3|                 3.0|
|  4|  2|    4|        5|           5|                 7.0|
|  4|  2|    5|        4|           7|                11.0|
|  4|  2|    6|        3|           9|                15.0|
|  5|  8|    7|        2|          17|                79.0|
|  8|  8|    8|        1|          25|               143.0|
|  9|  8|    9|        1|          33|               207.0|
+---+---+-----+---------+------------+--------------------+



In [0]:
tot_prefix_sum_Y = df_with_sums.orderBy("prefix_sum_Y", ascending=False).limit(1).collect()[0]["prefix_sum_Y"]

tot_prefix_squared_sum_Y = df_with_sums.orderBy("prefix_squared_sum_Y", ascending=False).limit(1).collect()[0]["prefix_squared_sum_Y"]

# print(tot_prefix_squared_sum_Y)
N = df_with_sums.count()  

df_final = df_with_sums.withColumn(
    "RSS",
    tot_prefix_squared_sum_Y - 
    pow(col("prefix_sum_Y"), 2) / col("index") -
    pow(tot_prefix_sum_Y-col("prefix_sum_Y"), 2) / col("remaining")
)

df_final.show()
# O(n) rather than O(n^2)

+---+---+-----+---------+------------+--------------------+-----------------+
|  A|  Y|index|remaining|prefix_sum_Y|prefix_squared_sum_Y|              RSS|
+---+---+-----+---------+------------+--------------------+-----------------+
|  1|  1|    1|        8|           1|                 1.0|             78.0|
|  2|  1|    2|        7|           2|                 2.0|67.71428571428572|
|  2|  1|    3|        6|           3|                 3.0|             54.0|
|  4|  2|    4|        5|           5|                 7.0|43.94999999999999|
|  4|  2|    5|        4|           7|                11.0|28.19999999999999|
|  4|  2|    6|        3|           9|                15.0|              1.5|
|  5|  8|    7|        2|          17|                79.0|37.71428571428572|
|  8|  8|    8|        1|          25|               143.0|           64.875|
|  9|  8|    9|        1|          33|               207.0|             86.0|
+---+---+-----+---------+------------+--------------------+-----

In [0]:
window_spec = Window.orderBy("index")
from pyspark.sql.functions import col, sum as spark_sum, pow, row_number, monotonically_increasing_id, when, lit, lead

df_final = df_final.withColumn("lead_A", lead("A").over(window_spec))
df_final = df_final.withColumn(
    "RSS",
    when((col("lead_A").isNotNull()) & (col("lead_A") <= col("A")), lit(float("inf"))).otherwise(col("RSS"))
)

df_final = df_final.drop("lead_A")
df_final.show()
df_final_sorted = df_final.orderBy(col("RSS"))

row0 = df_final_sorted.limit(1).collect()[0]
A, min_RSS = row0["A"], row0["RSS"]
print(A, min_RSS)

+---+---+-----+---------+------------+--------------------+-----------------+
|  A|  Y|index|remaining|prefix_sum_Y|prefix_squared_sum_Y|              RSS|
+---+---+-----+---------+------------+--------------------+-----------------+
|  1|  1|    1|        8|           1|                 1.0|             78.0|
|  2|  1|    2|        7|           2|                 2.0|         Infinity|
|  2|  1|    3|        6|           3|                 3.0|             54.0|
|  4|  2|    4|        5|           5|                 7.0|         Infinity|
|  4|  2|    5|        4|           7|                11.0|         Infinity|
|  4|  2|    6|        3|           9|                15.0|              1.5|
|  5|  8|    7|        2|          17|                79.0|37.71428571428572|
|  8|  8|    8|        1|          25|               143.0|           64.875|
|  9|  8|    9|        1|          33|               207.0|             86.0|
+---+---+-----+---------+------------+--------------------+-----

In [0]:
def compute_diff(df1, df2):
    for col_name in df2.columns:
        df2 = df2.withColumnRenamed(col_name, f"{col_name}_2")
    
    df1 = add_index(df1)
    df2 = add_index(df2)

    joined_df = df1.join(df2, on="index", how="inner")

    columns = []
    for col_name in numeric_cols:
        if col_name != "index": columns.append(col_name)
        
    sum_of_squared_diff = joined_df.select(
        *[
            spark_sum(
                when(
                    isnan(col(f"{col_name}")) | isnan(col(f"{col_name}_2")),
                    0
                ).otherwise(pow(col(f"{col_name}") - col(f"{col_name}_2"), 2))
            ).alias(f"sum_of_squared_{col_name}")
            for col_name in columns
        ]
    )

    total_sum_of_squared_diff = sum_of_squared_diff.select(
        sum([spark_sum(col_name)
        for col_name in sum_of_squared_diff.columns]).alias("total_sum_of_squared_diff")
    )

    # total_sum_of_squared_diff.show()
    return total_sum_of_squared_diff.collect()[0][0]
    
def replace_nans(df1, df2):
    for col_name in df2.columns:
        df2 = df2.withColumnRenamed(col_name, f"{col_name}_2")

    df1 = add_index(df1)
    df2 = add_index(df2)
    joined_df = df1.join(df2, on="index")

    for column in numeric_cols:
        if column != "index": 
            joined_df = joined_df.withColumn(
                column,
                when(~isnan(col(f"{column}")), col(f"{column}")).otherwise(col(f"{column}_2"))
            )
    
    cols_to_drop = [col for col in joined_df.columns if col.endswith('_2')] + ['index']
    result_df = joined_df.drop(*cols_to_drop)
    return result_df

def keep_nans(df1, df2):
    for col_name in df2.columns:
        df2 = df2.withColumnRenamed(col_name, f"{col_name}_2")

    df1 = add_index(df1)
    df2 = add_index(df2)
    joined_df = df1.join(df2, on="index")

    for column in numeric_cols:
        if column != "index": 
            joined_df = joined_df.withColumn(
                column,
                when(isnan(col(f"{column}")), np.nan).otherwise(col(f"{column}_2"))
            )
    
    cols_to_drop = [col for col in joined_df.columns if col.endswith('_2')] + ['index']
    result_df = joined_df.drop(*cols_to_drop)
    return result_df

In [0]:
# data = [
#     (np.nan, 1.0, 0.0, 0.8414709848078965),
#     (2.0, 4.0, 0.6931471805599453, 0.9092974268256817),
#     (3.0, 9.0, 1.0986122886681098, np.nan),
#     (4.0, np.nan, 1.3862943611198906, -0.7568024953079282),
#     (5.0, 25.0, 1.6094379124341003, np.nan),
#     (6.0, 36.0, 1.791759469228055, -0.27941549819892586),
#     (7.0, np.nan, 1.9459101490553132, 0.6569865987187891),
#     (8.0, 64.0, np.nan, 0.9893582466233818),
#     (9.0, 81.0, 2.1972245773362196, 0.4121184852417566),
#     (10.0, 100.0, np.nan, -0.5440211108893698),
#     (11.0, 121.0, 2.3978952727983707, -0.9999902065507035),
#     (12.0, 144.0, 2.4849066497880004, -0.5365729180004349),
#     (13.0, 169.0, np.nan, 0.4201670368266409),
#     (14.0, 196.0, 2.6390573296152584, 0.9906073556948704),
#     (np.nan, 225.0, 2.70805020110221, 0.6502878401571168)
# ]

# columns = ["a", "b", "c", "d"]
# df = spark.createDataFrame(data, columns)
def reconstruct_missing_vals(df):
    odf = df # Original DF

    columns = numeric_cols
    imputer = Imputer(inputCols=columns, outputCols=columns).setStrategy("mean")
    df = imputer.fit(df).transform(df)

    df = standardize_df(df)
    df_standardized_with_nan = keep_nans(odf, df)

    # Iterations for feature reconstruction
    for i in range(7):
        assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
        df = assembler.transform(df)

        pca = PCA(k=4, inputCol="features", outputCol="pcaFeatures")
        model = pca.fit(df)

        result = model.transform(df).select("features", "pcaFeatures")

        pc_matrix = model.pc.toArray().transpose()

        # pc_matrix broadcast? Down below
        pc_vectors = [Vectors.dense(row) for row in pc_matrix]
        # pc_vectors = spark.sparkContext.broadcast(pc_v)
        # pc_matrix is small and can easily fit into memory
        # print(pc_matrix.shape, pc_matrix)
        def reconstruct_features_udf(pca_features):
            reconstructed_features = Vectors.dense(
                [sum(pca_feature * pc_vector[i] for pca_feature, pc_vector in zip(pca_features, pc_vectors)) for i in range(len(pc_vectors[0]))]
            )
            return reconstructed_features

        reconstruct_features = udf(reconstruct_features_udf, VectorUDT())

        reconstructed_df = result.withColumn("reconstructed_features", reconstruct_features(col("pcaFeatures"))).drop("pcaFeatures").drop("features").withColumnRenamed("reconstructed_features", "features")

        # reconstructed_df.show(truncate=False)
        recon_df = reverse_vec_assemble(reconstructed_df, df.columns)
        diff = compute_diff(recon_df, df_standardized_with_nan)
        # print(diff)
        df = replace_nans(df_standardized_with_nan, recon_df)

    return df

In [0]:

def standardize_df(df):
    for column in numeric_cols:
        mean_val = df.select(mean(col(column))).first()[0]
        stddev_val = df.select(stddev(col(column))).first()[0]
        df = df.withColumn(column, (col(column) - mean_val) / stddev_val)
    return df

# data = [(1, 2), (2, 4), (3, 6)]
# columns = ["col1", "col2"]
# df = spark.createDataFrame(data, columns)
# df = standardize_df(df)
# df.show()

def reverse_vec_assemble(df_assembled, columns):
    vector_to_array = udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))

    df_array = df_assembled.withColumn("features_array", vector_to_array("features"))

    for i, col_name in enumerate(columns):
        df_array = df_array.withColumn(col_name, df_array["features_array"].getItem(i))

    df_array = df_array.drop("features_array").drop("features")
    return df_array