In [None]:
1. Working with RDDs:
   a) Write a Python program to create an RDD from a local data source.
   from pyspark import SparkContext

def create_rdd_from_local():
    sc = SparkContext("local", "RDD Example")
    data = ["Spark", "Python", "RDD", "Example"]
    rdd = sc.parallelize(data)
    return rdd

if __name__ == "__main__":
    rdd = create_rdd_from_local()
    print(rdd.collect())

   b) Implement transformations and actions on the RDD to perform data processing tasks.
   def process_rdd(rdd):
    # Transformations
    rdd_upper = rdd.map(lambda x: x.upper())
    rdd_filtered = rdd_upper.filter(lambda x: "A" in x)

    # Actions
    result = rdd_filtered.collect()
    return result

if __name__ == "__main__":
    rdd = create_rdd_from_local()
    processed_data = process_rdd(rdd)
    print(processed_data)

   c) Analyze and manipulate data using RDD operations such as map, filter, reduce, or aggregate.
   def analyze_rdd(rdd):
    # Mapping each word to a tuple (word, 1)
    word_counts = rdd.map(lambda word: (word, 1))

    # Reducing by key to count the occurrences of each word
    reduced_word_counts = word_counts.reduceByKey(lambda a, b: a + b)

    return reduced_word_counts.collect()

if __name__ == "__main__":
    rdd = create_rdd_from_local()
    word_counts = analyze_rdd(rdd)
    print(word_counts)

2. Spark DataFrame Operations:
   a) Write a Python program to load a CSV file into a Spark DataFrame.
from pyspark.sql import SparkSession

def load_csv_to_dataframe(file_path):
    spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    return df

if __name__ == "__main__":
    df = load_csv_to_dataframe("data.csv")
    df.show()

   b)Perform common DataFrame operations such as filtering, grouping, or joining.

   def dataframe_operations(df):
    # Filtering
    filtered_df = df.filter(df['age'] > 30)

    # Grouping
    grouped_df = df.groupBy('department').count()

    # Joining (assuming another DataFrame df2)
    df2 = df.withColumnRenamed('id', 'employee_id')  # Example DataFrame
    joined_df = df.join(df2, df['id'] == df2['employee_id'])

    return filtered_df, grouped_df, joined_df

if __name__ == "__main__":
    df = load_csv_to_dataframe("data.csv")
    filtered_df, grouped_df, joined_df = dataframe_operations(df)

    filtered_df.show()
    grouped_df.show()
    joined_df.show()

   c) Apply Spark SQL queries on the DataFrame to extract insights from the data.

   def apply_spark_sql(df):
    df.createOrReplaceTempView("employees")
    spark = df.sparkSession

    # Example SQL query
    result_df = spark.sql("SELECT department, AVG(salary) as avg_salary FROM employees GROUP BY department")

    return result_df

if __name__ == "__main__":
    df = load_csv_to_dataframe("data.csv")
    result_df = apply_spark_sql(df)
    result_df.show()

3. Spark Streaming:
  a) Write a Python program to create a Spark Streaming application.

  from pyspark.streaming import StreamingContext

def create_spark_streaming_app():
    sc = SparkContext("local[2]", "Streaming Example")
    ssc = StreamingContext(sc, 1)  # 1-second batch interval
    return ssc

if __name__ == "__main__":
    ssc = create_spark_streaming_app()
    ssc.start()
    ssc.awaitTermination()

   b) Configure the application to consume data from a streaming source (e.g., Kafka or a socket).

   def consume_data_from_socket(ssc, hostname, port):
    lines = ssc.socketTextStream(hostname, port)
    return lines

if __name__ == "__main__":
    ssc = create_spark_streaming_app()
    lines = consume_data_from_socket(ssc, "localhost", 9999)
    lines.pprint()

    ssc.start()
    ssc.awaitTermination()

   c) Implement streaming transformations and actions to process and analyze the incoming data stream.

   def process_stream(lines):
    words = lines.flatMap(lambda line: line.split(" "))
    word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
    word_counts.pprint()

if __name__ == "__main__":
    ssc = create_spark_streaming_app()
    lines = consume_data_from_socket(ssc, "localhost", 9999)
    process_stream(lines)

    ssc.start()
    ssc.awaitTermination()


4. Spark SQL and Data Source Integration:
   a) Write a Python program to connect Spark with a relational database (e.g., MySQL, PostgreSQL).

   def connect_to_database():
    spark = SparkSession.builder.appName("Database Integration").getOrCreate()
    jdbc_url = "jdbc:mysql://localhost:3306/db_name"
    properties = {
        "user": "username",
        "password": "password",
        "driver": "com.mysql.jdbc.Driver"
    }
    df = spark.read.jdbc(jdbc_url, "table_name", properties=properties)
    return df

if __name__ == "__main__":
    df = connect_to_database()
    df.show()

   b)Perform SQL operations on the data stored in the database using Spark SQL.

def perform_sql_operations(df):
    df.createOrReplaceTempView("table_name")
    spark = df.sparkSession

    result_df = spark.sql("SELECT * FROM table_name WHERE age > 30")
    return result_df

if __name__ == "__main__":
    df = connect_to_database()
    result_df = perform_sql_operations(df)
    result_df.show()

   c) Explore the integration capabilities of Spark with other data sources, such as Hadoop Distributed File System (HDFS) or Amazon S3.

   def load_data_from_hdfs():
    spark = SparkSession.builder.appName("HDFS Integration").getOrCreate()
    hdfs_path = "hdfs://localhost:9000/user/hadoop/data.csv"
    df = spark.read.csv(hdfs_path, header=True, inferSchema=True)
    return df

def load_data_from_s3():
    spark = SparkSession.builder.appName("S3 Integration").getOrCreate()
    s3_path = "s3a://my-bucket/data.csv"
    df = spark.read.csv(s3_path, header=True, inferSchema=True)
    return df

if __name__ == "__main__":
    df_hdfs = load_data_from_hdfs()
    df_hdfs.show()

    df_s3 = load_data_from_s3()
    df_s3.show()
