In [19]:
import os
import glob
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("RecursiveCSVLoader").getOrCreate()

data_dir = "./" 
csv_files = glob.glob(os.path.join(data_dir, "**", "*.csv"), recursive=True)
json_files = glob.glob(os.path.join(data_dir, "**", "*.json"), recursive=True)

dfs = {}
for file_path in csv_files:
    file_name = os.path.splitext(os.path.basename(file_path))[0]
    dfs[file_name] = spark.read.option("header", True).option("inferSchema", True).csv(file_path)

for file_path in json_files:
    relative_path = os.path.relpath(file_path, data_dir)
    key_name = relative_path.replace("/", "_").replace(".json", "")
    dfs[key_name] = spark.read.option("multiline", True).json(file_path)



for name in dfs:
    print(f"Loaded: {name}")


Loaded: google_trends__2025-05-13
Loaded: google_trends__2025-05-14
Loaded: google_trends__2025-05-15
Loaded: google_trends__2025-05-16
Loaded: google_trends__2025-05-17
Loaded: google_trends__2025-05-18
Loaded: google_trends__2025-05-19
Loaded: google_trends__2025-05-20
Loaded: google_trends__2025-05-21
Loaded: google_trends__2025-05-22
Loaded: google_trends__2025-05-23
Loaded: google_trends__2025-05-24
Loaded: google_trends__2025-05-25
Loaded: google_trends__2025-05-26
Loaded: google_trends__2025-05-27
Loaded: google_trends__2025-05-28
Loaded: google_trends__2025-05-29
Loaded: google_trends__2025-05-30
Loaded: google_trends__2025-05-31
Loaded: google_trends__2025-06-01
Loaded: google_trends__2025-06-02
Loaded: google_trends__2025-06-03
Loaded: google_trends__2025-06-04
Loaded: google_trends__2025-06-05
Loaded: google_trends__2025-06-06
Loaded: google_trends__2025-06-07
Loaded: google_trends__2025-06-08
Loaded: google_trends__2025-06-09
Loaded: google_trends__2025-06-10
Loaded: google

In [20]:
for name, df in dfs.items():
    print()
    print()
    print(100*"=")
    print()
    print()
    print(f"New file: {name}")
    df.printSchema()
    df.show(1)






New file: google_trends__2025-05-13
root
 |-- date: date (nullable = true)
 |-- Prawo i Sprawiedliwość: integer (nullable = true)
 |-- Koalicja Obywatelska: integer (nullable = true)
 |-- Konfederacja: integer (nullable = true)
 |-- Razem: integer (nullable = true)
 |-- Nowa Lewica: integer (nullable = true)
 |-- Karol Nawrocki: integer (nullable = true)
 |-- Rafał Trzaskowski: integer (nullable = true)
 |-- Sławomir Mentzen: integer (nullable = true)
 |-- Adrian Zandberg: integer (nullable = true)
 |-- Magdalena Biejat: integer (nullable = true)

+----------+----------------------+--------------------+------------+-----+-----------+--------------+-----------------+----------------+---------------+----------------+
|      date|Prawo i Sprawiedliwość|Koalicja Obywatelska|Konfederacja|Razem|Nowa Lewica|Karol Nawrocki|Rafał Trzaskowski|Sławomir Mentzen|Adrian Zandberg|Magdalena Biejat|
+----------+----------------------+--------------------+------------+-----+-----------+-------------

In [23]:
from pyspark.sql.functions import col

def normalize_column_names(df):
    for col_name in df.columns:
        df = df.withColumnRenamed(col_name, col_name.strip().lower().replace(" ", "_").replace(".", "").replace("/", "_"))
    return df

dfs = {name: normalize_column_names(df) for name, df in dfs.items()}

In [24]:
from functools import reduce

trend_dfs = [df for name, df in dfs.items() if name.startswith("google_trends__")]
combined_trends_df = reduce(lambda a, b: a.unionByName(b), trend_dfs)

combined_trends_df = combined_trends_df.withColumnRenamed("date", "trend_date")
combined_trends_df.show(3)


+----------+----------------------+--------------------+------------+-----+-----------+--------------+-----------------+----------------+---------------+----------------+
|trend_date|prawo_i_sprawiedliwość|koalicja_obywatelska|konfederacja|razem|nowa_lewica|karol_nawrocki|rafał_trzaskowski|sławomir_mentzen|adrian_zandberg|magdalena_biejat|
+----------+----------------------+--------------------+------------+-----+-----------+--------------+-----------------+----------------+---------------+----------------+
|2025-05-13|                    96|                  12|          30|   30|          0|            68|               80|              51|             42|              31|
|2025-05-14|                    81|                  10|          37|   24|          7|            53|               55|              46|             36|              23|
|2025-05-15|                    84|                  11|          39|   28|          7|            60|               59|              50|        

In [27]:
polls_data = dfs.get("polls_data")
polls_kandydaci = dfs.get("polls_kandydaci")

# Ensure dates are standardized
polls_data = polls_data.withColumnRenamed("data", "poll_date")
polls_kandydaci = polls_kandydaci.withColumnRenamed("date", "poll_date")

print(polls_kandydaci)

DataFrame[institute: string, sample: double, poll_date: date, trzaskowski: double, nawrocki: double, nz_bu: double, przewaga: double]


In [31]:
followers_df = dfs.get("followers3")
followers_df.show()


+--------------------+---------+-----------+
|              partia|   portal|  followers|
+--------------------+---------+-----------+
|Prawo i Sprawiedl...| facebook|     436000|
|Koalicja Obywatelska| facebook|     407000|
|        Konfederacja| facebook|     887000|
|               Razem| facebook|     176000|
|         Nowa Lewica| facebook|     162000|
|Prawo i Sprawiedl...|instagram|      46117|
|Koalicja Obywatelska|instagram|     107093|
|        Konfederacja|instagram|     232303|
|               Razem|instagram|     107507|
|         Nowa Lewica|instagram|      80060|
|Prawo i Sprawiedl...|  twitter|Brak danych|
|Koalicja Obywatelska|  twitter|Brak danych|
|        Konfederacja|  twitter|Brak danych|
|               Razem|  twitter|Brak danych|
|         Nowa Lewica|  twitter|Brak danych|
|Prawo i Sprawiedl...|  youtube|Brak danych|
|Koalicja Obywatelska|  youtube|Brak danych|
|        Konfederacja|  youtube|Brak danych|
|               Razem|  youtube|Brak danych|
|         