# Spark CSV Read Demo

## Introduction

This notebook was created by [Jupyter AI](https://github.com/jupyterlab/jupyter-ai) with the following prompt:

> /generate demonstre a leitura de um arquivos csv utilizando spark

This Jupyter notebook demonstrates how to read a CSV file using Apache Spark, starting with setting up the environment by installing necessary libraries and importing required modules. It proceeds to load data from a CSV file into a DataFrame using Spark's read API, followed by inspecting the schema, basic statistics, and sample rows of the loaded data. The notebook also includes an optional section on common data processing tasks such as filtering, aggregation, and transformations, before concluding with saving the processed DataFrame back to a CSV or another format.

## Load Data from CSV

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("CSV Data Reader") \
    .getOrCreate()

In [None]:
csv_file_path = "path/to/your/data.csv"

In [None]:
df = spark.read.options(header="true", inferSchema="true").csv(csv_file_path)

In [None]:
df.show(5)
df.printSchema()

In [None]:
spark.stop()

## Inspect Data

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("Read CSV and Inspect Data").getOrCreate()

In [None]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .load("./data.csv")

In [None]:
print("Schema of the DataFrame:")
df.printSchema()

In [None]:
print("\nBasic Statistics (if numerical columns are present):")
try:
    df.describe().show()
except Exception as e:
    print(f"Could not generate statistics: {e}")

In [None]:
print("\nSample Rows:")
df.show(5)

## Data Processing (Optional)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as sum_col

In [None]:
# Initialize a Spark session
spark = SparkSession.builder.appName("CSV Data Processing").getOrCreate()

In [None]:
# Load the CSV file into a DataFrame
csv_file_path = "path/to/your/csvfile.csv"  # Replace with your actual CSV path
data_df = spark.read.options(header=True, inferSchema=True).csv(csv_file_path)

In [None]:
# Example: Filter data based on conditions
filtered_df = data_df.filter((col("column1") > 5) & (col("column2") == "value"))

In [None]:
# Show the first few rows of filtered DataFrame
filtered_df.show()

In [None]:
# Example: Aggregate data using groupBy and sum functions
aggregated_df = data_df.groupBy("some_column").agg(sum_col(col("amount")).alias("total_amount"))

In [None]:
# Show the results of aggregation
aggregated_df.show()

In [None]:
# Example: Transformation - Selecting specific columns
selected_columns_df = data_df.select("column1", "column2")

In [None]:
# Display selected columns DataFrame
selected_columns_df.show()

In [None]:
# Stop the Spark session when done
spark.stop()

## Save Processed Data

In [None]:
from pyspark.sql import SparkSession

In [None]:
# Create a spark session if not already created
spark = SparkSession.builder.appName("Save Processed Data").getOrCreate() if 'spark' not in locals() else spark

In [None]:
processed_df = ...  # Assume processed_df is defined elsewhere

In [None]:
output_path_csv = "path/to/output/folder/processed_data.csv"
processed_df.write.mode("overwrite").csv(output_path_csv)

In [None]:
output_path_parquet = "path/to/output/folder/processed_data.parquet"
processed_df.write.mode("overwrite").parquet(output_path_parquet)

In [None]:
output_path_json = "path/to/output/folder/processed_data.json"
processed_df.write.mode("overwrite").json(output_path_json)

In [None]:
output_path_orc = "path/to/output/folder/processed_data.orc"
processed_df.write.mode("overwrite").orc(output_path_orc)