1. Load Data into Spark:

    Load data into Spark from CSV file.

    Tasks for you:

     - Load data from a JSON file.
     - Load data from a Parquet file.


In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("LoadCSV").getOrCreate()

# Load data from CSV file
df_csv = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load("path/to/csv/file.csv")

# Show the data
df_csv.show()

2. Data Transformations:

    Use Spark to transform data.

    Tasks:

     - Filter data based on a specific column.
       - How to use the filter() function to select rows based on a condition

     - Join two datasets.
       - How to use the join() function to combine the two datasets based on a common column
       - Example of joining data where the column names are different in the two datasets

     - Aggregate data using different functions.
       - How to use the groupBy() function to group the data by a specific column
       - Provide examples of different aggregate functions (e.g., sum(), avg(), count()) and show how to apply them to the grouped data

     - Group data by a specific column.
       - How to use the groupBy() function to group the data by a specific column
       - Example of grouping data by multiple columns and applying aggregate functions on the grouped data


In [None]:
# Load data from a CSV file
df = spark.read.csv("data.csv", header=True, inferSchema=True)
# Load data from second CSV files
df2 = spark.read.csv("data2.csv", header=True, inferSchema=True)

# 1. Filter data based on a condition
filtered_df = df.filter(df['age'] > 25)

# 2. Join the two datasets
joined_df = df.join(df2, on=['id'], how='inner')

# 3. Aggregate data using different functions
agg_df = df.agg({'age': 'mean', 'income': 'sum'})

# 4. Group data by a specific column
grouped_df = df.groupBy('gender').count()


3. Spark SQL:

    Use Spark SQL to interact with data.

    Tasks:

     - Create a temporary view from a DataFrame.
     - Query data using Spark SQL.
     - Perform aggregation using Spark SQL.



In [None]:
# Create a SparkSession:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

# Load data into a DataFrame:
df = spark.read.csv("file_path.csv", header=True, inferSchema=True)

# Create a temporary view from the DataFrame:
df.createOrReplaceTempView("temp_view_name")

# Query data using Spark SQL:
result = spark.sql("SELECT * FROM temp_view_name WHERE column_name='value'")
result.show()

# Perform aggregation using Spark SQL:

result = spark.sql("SELECT column_name, AVG(column2) as avg_col2 FROM temp_view_name GROUP BY column_name")
result.show()

# Try to modify the column names and criteria according to your data and requirements.


4. Machine Learning with Spark:

    Use Spark to build a simple machine learning model.

    Tasks:

    - Load the data into Spark.
        - Demonstrate how to load data from different sources (e.g., CSV, JSON, Parquet, etc.) into Spark.
    

    - Pre-process the data.
        - Demonstrate how to perform common pre-processing tasks in Spark, such as handling missing values, scaling numerical features, encoding categorical features, etc.


    - Split the data into training and testing datasets.
        - Demonstrate how to split data into training and testing datasets using Spark.


    - Build a simple machine learning model.
        - Demonstrate how to use Spark's machine learning library (MLlib) to build a simple machine learning model, such as linear regression or logistic regression.


    - Evaluate the model performance.
        - Demonstrate how to use Spark's MLlib to evaluate the performance of a machine learning model using metrics such as accuracy, precision, recall, F1-score, etc.

In [None]:
# Load data into Spark
df = spark.read.format("csv").option("header", "true").load("path/to/data.csv")

# Pre-process the data
from pyspark.ml.feature import Imputer, StandardScaler, OneHotEncoder, StringIndexer

# Handle missing values
imputer = Imputer(inputCols=["col1", "col2"], outputCols=["col1_imputed", "col2_imputed"])
df = imputer.fit(df).transform(df)

# Scale numerical features
scaler = StandardScaler(inputCol="num_col", outputCol="scaled_num_col")
df = scaler.fit(df).transform(df)

# Encode categorical features
indexer = StringIndexer(inputCol="cat_col", outputCol="indexed_cat_col")
encoder = OneHotEncoder(inputCols=["indexed_cat_col"], outputCols=["encoded_cat_col"])
df = indexer.fit(df).transform(df)
df = encoder.fit(df).transform(df)

# Split data into training and testing datasets
(trainingData, testData) = df.randomSplit([0.7, 0.3])

# Build a simple machine learning model
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(trainingData)

# Evaluate the model performance
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(testData)
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)
print("AUC:", auc)

5. Spark Streaming:

    Use Spark to process data in real-time.

    Tasks:

    - Set up a Spark Streaming application.
        - How to create a Spark Streaming application using the SparkSession and StreamingContext classes.
        - Explain the different configuration options available for Spark Streaming applications. FIXME

    - Read data from a streaming source (e.g., Kafka, Flume, HDFS, etc.).
        - How to read data from a streaming source using the appropriate input DStream (e.g., KafkaUtils.createDirectStream).

    - Process the data in real-time (e.g., map, filter, reduceByKey, etc.).
        - How to apply these operations to streaming data using the appropriate DStream functions.

    - Write the output to a file or a database (e.g., writing to a file, writing to a database, etc.).
        - Show how to write the output of a Spark Streaming application to the appropriate output DStream (e.g., using the writeStream function).

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Set up the Spark Streaming context with a batch interval of 5 seconds
sc = SparkContext("local[*]", "StreamingExample")
ssc = StreamingContext(sc, 5)

# Read data from a Kafka topic
kafkaParams = {"metadata.broker.list": "localhost:9092"}
stream = KafkaUtils.createDirectStream(ssc, ["my_topic"], kafkaParams)

# Process the data in real-time
words = stream.flatMap(lambda x: x[1].split(" "))
word_counts = words.countByValue()

# Write the output to a file
word_counts.repartition(1).saveAsTextFiles("/output/directory/output")

# Start the streaming context
ssc.start()
ssc.awaitTermination()


6. Performance Tuning:

    Optimize Spark performance.

    Tasks:

    - Optimize Spark configuration settings.
    - Use caching to improve performance.
    - Use broadcast variables to optimize join operations.
    - Use partitioning to improve data processing.


In [None]:
from pyspark.sql import SparkSession

# Set up a SparkSession
spark = SparkSession.builder.appName("performance-tuning").getOrCreate()

# Load some data from a file
data = spark.read.csv("data.csv", header=True, inferSchema=True)

# Cache the data for faster access
data.cache()

# Use broadcast variables to optimize a join operation
broadcast_var = spark.sparkContext.broadcast({"key1": "value1", "key2": "value2"})
joined_data = data.join(broadcast_var.value)

# Use partitioning to optimize a data processing pipeline
partitioned_data = data.repartition(4)
processed_data = partitioned_data.filter("col1 > 10").groupBy("col2").count()

# Stop the SparkSession
spark.stop()
