# Load csv into dataframe

### Pandas

Current Implementation - uses 3.1GB to load entire df into memory takes 7+ mins



In [4]:
from pathlib import Path
from typing import List

from dotenv import load_dotenv
import pandas as pd
from datawagon.objects.csv_file_info import CsvFileInfo
from datawagon.objects.csv_loader import CSVLoader
from datawagon.objects.file_utils import FileUtils

# can this notebook be moved to the notebook venv?

load_dotenv(verbose=True)

file_utils = FileUtils()
source_path = Path('/Users/jm/temp/caravan/Caravan Historical Data/2023')

# remove this from class and use scan and filter the group by name instead
csv_files = file_utils.scan_for_csv_files_with_name(source_path, 'claim_raw_')

csv_file_infos = [
    CsvFileInfo.build_data_item(csv_file) for csv_file in csv_files
]

print(csv_file_infos)

files: List[pd.DataFrame] =[]

for csv_info in csv_file_infos:
    loader = CSVLoader(csv_info)
    files.append(loader.load_data())

one_df = pd.concat(files, ignore_index=True)

one_df.info()


[CsvFileInfo(file_path=PosixPath('/Users/jm/temp/caravan/Caravan Historical Data/2023/1.23/0_claim_raw_v1-1/YouTube_CaravanAffiliates_M_20230101_claim_raw_v1-1.csv.gz'), file_dir='/Users/jm/temp/caravan/Caravan Historical Data/2023/1.23/0_claim_raw_v1-1', file_name='YouTube_CaravanAffiliates_M_20230101_claim_raw_v1-1.csv.gz', file_name_without_extension='YouTube_CaravanAffiliates_M_20230101_claim_raw_v1-1', content_owner='CaravanAffiliates', file_date_key=20230101, file_date=datetime.date(2023, 1, 1), file_month_end_date=datetime.date(2023, 1, 31), report_date_key=20230131, file_version='v1-1', table_name='claim_raw', file_size_in_bytes=32868194, file_size='31.35 MB'), CsvFileInfo(file_path=PosixPath('/Users/jm/temp/caravan/Caravan Historical Data/2023/1.23/0_claim_raw_v1-1/YouTube_CaravanInc_M_20230101_claim_raw_v1-1.csv.gz'), file_dir='/Users/jm/temp/caravan/Caravan Historical Data/2023/1.23/0_claim_raw_v1-1', file_name='YouTube_CaravanInc_M_20230101_claim_raw_v1-1.csv.gz', file_name

### Pyspark



In [2]:
from pyspark.sql.types import (
    StringType,
    IntegerType,
    DateType,
    DecimalType,
)
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_date, lit

# import pyspark.pandas as ps
from functools import reduce
from pathlib import Path
from typing import List

from dotenv import load_dotenv
from datawagon.objects.csv_file_info import CsvFileInfo
from datawagon.objects.file_utils import FileUtils

load_dotenv(verbose=True)

file_utils = FileUtils()
source_path = Path("/Users/jm/temp/caravan/Caravan Historical Data/2023")


def unionAll(*dfs) -> DataFrame:
    return reduce(DataFrame.union, dfs)


# remove this from class and use scan and filter the group by name instead
csv_files = file_utils.scan_for_csv_files_with_name(source_path, "claim_raw_v1")

csv_file_infos = [CsvFileInfo.build_data_item(csv_file) for csv_file in csv_files]

spark  = SparkSession.builder.master("local[*]").getOrCreate()

files: List[DataFrame] = []

for csv_info in csv_file_infos:
    # df = ps.read_csv(str(csv_info.file_path), header=True)
    df = spark.read.csv(str(csv_info.file_path), header=True, inferSchema=True, sep=",")

    additional_columns = {
        "_file_name": lit(csv_info.file_name_without_extension),
        "_content_owner": lit(csv_info.content_owner),
        "_report_date_key": lit(csv_info.report_date_key),
        "_file_load_date": current_date(),
    }

    df = df.withColumns(additional_columns)
    files.append(df)

spark_df = unionAll(*files)

cols = spark_df.columns

new_columns = [
    col.replace(" ", "_")
    .replace(".", "_")
    .replace("-", "_")
    .replace("/", "_")
    .replace("(", "_")
    .replace(")", "")
    .replace("?", "")
    .replace(":", "")
    .lower()
    for col in cols
]

spark_df = spark_df.toDF(*new_columns)
# schema = StructType()

float_cols = ["revenue"]
int_cols = ["view", "day", "date_key", "sec"]
date_cols = ["date"]

for col in spark_df.columns:
    if any(name in col for name in float_cols):
        # floats will be changed to numeric on load (pandas doesn't support the type)
        spark_df = spark_df.withColumn(col, spark_df[col].cast(DecimalType(19, 7)))
    elif any(name in col for name in int_cols):
        spark_df = spark_df.withColumn(col, spark_df[col].cast(IntegerType()))
    elif any(name in col for name in date_cols):
        spark_df = spark_df.withColumn(col, spark_df[col].cast(DateType()))
    else:
        spark_df = spark_df.withColumn(col, spark_df[col].cast(StringType()))
# print(schema)
# spark_df = spark_df.to(schema)

spark_df.corr('owned_views','partner_revenue', method="pearson")

# one_df = pd.concat(files, ignore_index=True)
# spark_df = spark.read.concat(files)
# spark_df = spark_df.repartition(1)
# spark_df.write.mode("overwrite").parquet("/Users/jm/temp/caravan/temp_out_file.parquet")
# spark_df.coalesce(1).write.parquet('/Users/jm/temp/caravan/temp_out_file.parquet')
# spark_df.printSchema()
# spark_df.show(5)
# spark_df.show()
# spark_df.count()
# spark_df.describe().show()

                                                                                

0.2444156941198514

### Duck DB