###  Foundations, I/O & Profiling


In [0]:

# Create dataset directory
dbutils.fs.mkdirs("/FileStore/datasets")

# Upload authors_publications.csv to /FileStore/datasets/
# Or directly use the generated sample file below


In [0]:
# Use For Generating sample file below Else Upload Manually Which Share
import pandas as pd

data = [
    {
        "Authors": f"Author_{i}",
        "Publication": f"Research Paper {i}",
        "Year": 2015 + (i % 10),
        "Citations": (i * 7) % 150,
        "Country": ["USA", "UK", "India", "Germany", "Japan", "Australia", "France", "Canada", "China", "Brazil"][i % 10]
    }
    for i in range(1, 51)
]

df = pd.DataFrame(data)
df.to_csv("/dbfs/FileStore/datasets/authors_publications.csv", index=False)
display(df.head())


## STEP 1: Read CSV vs Parquet

In [0]:

csv_df = spark.read.option("header", True).csv("/FileStore/datasets/authors_publications.csv")
display(csv_df)

csv_df.write.mode("overwrite").parquet("/FileStore/datasets/authors_publications_parquet")

parquet_df = spark.read.parquet("/FileStore/datasets/authors_publications_parquet")
display(parquet_df)

print("CSV Schema:")
csv_df.printSchema()

print("Parquet Schema:")
parquet_df.printSchema()


In [0]:
parquet_df = spark.read.parquet("/FileStore/datasets/authors_publications_parquet")
display(parquet_df)


## STEP 2: File Sizing & Partitioning

In [0]:

display(dbutils.fs.ls("/FileStore/datasets/authors_publications_parquet"))

csv_df.repartition(2).write.mode("overwrite").parquet("/FileStore/datasets/partitioned_parquet")
display(dbutils.fs.ls("/FileStore/datasets/partitioned_parquet"))


## STEP 3: Predicate Pushdown & Column Pruning

In [0]:

filtered_df = parquet_df.filter(parquet_df["Citations"] > 20)
display(filtered_df)

columns_pruned_df = parquet_df.select("Authors", "Citations")
display(columns_pruned_df)


## STEP 4: Caching & Persistence

In [0]:

parquet_df.cache()
parquet_df.count()  # triggers cache

# Measure improvement
%time parquet_df.count()

from pyspark import StorageLevel
parquet_df.persist(StorageLevel.MEMORY_AND_DISK)
parquet_df.count()

parquet_df.unpersist()


## STEP 5: Diagnose I/O Bottlenecks in Spark UI
Explore SQL, Storage, and Executors tabs to analyze performance.

In [0]:
parquet_df.is_cached

