In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=ba66f48d601f57c0c21e2be13ea9eb17e92c1ad7927e45119d999503c748a094
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import functions as F
from pyspark.sql.functions import col, concat_ws, split, size, count, when

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("FinancialData").getOrCreate()

In [None]:
# Load CSV files into DataFrames
df1 = spark.read.option("header", "false").option("mode", "DROPMALFORMED").csv("/content/drive/MyDrive/Financial Data/2014Q1.csv")
df2 = spark.read.option("header", "false").option("mode", "DROPMALFORMED").csv("/content/drive/MyDrive/Financial Data/2014Q2.csv")
df3 = spark.read.option("header", "false").option("mode", "DROPMALFORMED").csv("/content/drive/MyDrive/Financial Data/2014Q3.csv")
df4 = spark.read.option("header", "false").option("mode", "DROPMALFORMED").csv("/content/drive/MyDrive/Financial Data/2014Q4.csv")
df5 = spark.read.option("header", "false").option("mode", "DROPMALFORMED").csv("/content/drive/MyDrive/Financial Data/2015Q1.csv")
df6 = spark.read.option("header", "false").option("mode", "DROPMALFORMED").csv("/content/drive/MyDrive/Financial Data/2015Q2.csv")
df7 = spark.read.option("header", "false").option("mode", "DROPMALFORMED").csv("/content/drive/MyDrive/Financial Data/2015Q3.csv")
df8 = spark.read.option("header", "false").option("mode", "DROPMALFORMED").csv("/content/drive/MyDrive/Financial Data/2015Q4.csv")

In [None]:
dataframes_2014 = [df1, df2, df3, df4]
dataframes_2015 = [df5, df6, df7, df8]

In [None]:
df_combined_2014 = None
for df in dataframes_2014:
    if len(df.columns) == 3:
        df = df.withColumn("Combined_Column", F.concat_ws(" ", col("_c0"), col("_c2")))
    elif len(df.columns) == 2:
        df = df.withColumn("Combined_Column", F.concat_ws(" ", col("_c0"), col("_c1")))
    elif len(df.columns) == 1:
        df = df.withColumn("Combined_Column", col("_c0"))

    df = df.select("Combined_Column")
    if df_combined_2014 is None:
        df_combined_2014 = df
    else:
        df_combined_2014 = df_combined_2014.union(df)

# Show the final combined DataFrame
df_combined_2014.show()

+--------------------+
|     Combined_Column|
+--------------------+
||100000506220|032...|
||100000506220|042...|
||100000506220|052...|
||100000506220|062...|
||100000506220|072...|
||100000506220|082...|
||100000506220|092...|
||100000506220|102...|
||100000506220|112...|
||100000506220|122...|
||100000506220|012...|
||100000506220|022...|
||100000506220|032...|
||100000506220|042...|
||100000506220|052...|
||100000506220|062...|
||100000506220|072...|
||100000506220|082...|
||100000506220|092...|
||100000506220|102...|
+--------------------+
only showing top 20 rows



In [None]:
df_combined_2015 = None
for df in dataframes_2015:
    if len(df.columns) == 3:
        df = df.withColumn("Combined_Column", F.concat_ws(" ", col("_c0"), col("_c2")))
    elif len(df.columns) == 2:
        df = df.withColumn("Combined_Column", F.concat_ws(" ", col("_c0"), col("_c1")))
    elif len(df.columns) == 1:
        df = df.withColumn("Combined_Column", col("_c0"))

    df = df.select("Combined_Column")
    if df_combined_2015 is None:
        df_combined_2015 = df
    else:
        df_combined_2015 = df_combined_2015.union(df)

# Show the final combined DataFrame
df_combined_2015.show()

+--------------------+
|     Combined_Column|
+--------------------+
||100002091588|012...|
||100002091588|022...|
||100002091588|032...|
||100002091588|042...|
||100002091588|052...|
||100002091588|062...|
||100002091588|072...|
||100002091588|082...|
||100002091588|092...|
||100002091588|062...|
||100002091588|072...|
||100002091588|082...|
||100002091588|092...|
||100002091588|102...|
||100002091588|112...|
||100006803390|122...|
||100008763886|022...|
||100008763886|032...|
||100008763886|042...|
||100008763886|052...|
+--------------------+
only showing top 20 rows



In [None]:
split_col = split(col("Combined_Column"), "\\|")
num_cols = df_combined_2014.select(size(split_col)).first()[0]
df_split_2014 = df_combined_2014.select(*(split_col[i].alias(f"col_{i}") for i in range(num_cols)))
df_split_2014.show()

+-----+------------+------+-----+-----+-----+-----+-----+-----+---------+------+---------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|col_0|       col_1| col_2|col_3|col_4|col_5|col_6|col_7|col_8|    col_9|col_10|   col_11|col_12|col_13|col_14|col_15|col_16|col_17|col_18|col_19|col_20|col_21|col_22|col_23|col_24|col_25|col_26|col_27|col_28|c

In [None]:
split_col = split(col("Combined_Column"), "\\|")
num_cols_2015 = df_combined_2015.select(size(split_col)).first()[0]
df_split_2015 = df_combined_2015.select(*(split_col[i].alias(f"col_{i}") for i in range(num_cols)))
df_split_2015.show()

+-----+------------+------+-----+------------------+--------------------+-----+-----+-----+---------+------+---------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|col_0|       col_1| col_2|col_3|             col_4|               col_5|col_6|col_7|col_8|    col_9|col_10|   col_11|col_12|col_13|col_14|col_15|col_16|col_17|col_18|col_19|col_20|c

In [None]:
df_split_2014.write.mode("overwrite").parquet("/content/drive/MyDrive/Financial Data/df_split_2014.parquet")
df_split_2015.write.mode("overwrite").parquet("/content/drive/MyDrive/Financial Data/df_split_2015.parquet")