# 1.) Working with RDDs:
   a) Write a Python program to create an RDD from a local data source.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark=SparkSession.builder.appName("Practice").getOrCreate()
sc=spark.sparkContext
numberrdd=sc.parallelize([1,2,3,4,5])

b) Implement transformations and actions on the RDD to perform data processing tasks.

In [None]:
wordsrdd=sc.parallelize(['hello','hi','hello'])
newrdd=wordsrdd.map(lambda x: (x,1)).groupByKey(lambda a,b:a+b)
print(newrdd.collect())

c) Analyze and manipulate data using RDD operations such as map, filter, reduce, or aggregate.

In [None]:
#filter transformation:
filterrdd=numberrdd.filter(lambda x: x%2==0)

#map transformation:      
wordsrdd=sc.parallelize(['hello','hi','hello'])
newrdd=wordsrdd.map(lambda x: (x,1)).groupByKey(lambda a,b:a+b)
print(newrdd.collect())

#reduce transformation:
no_of_char=wordsrdd.map(lambda x:(len(x))).reduce(lambda a,b:a+b)

#aggregate transformation:
numberrdd=sc.parallelize([1,2,3,4,5])
sum_of_num=numberrdd.sum()

# 2.)Spark DataFrame Operations:
   a) Write a Python program to load a CSV file into a Spark DataFrame.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("Dataframe_practice").getOrCreate()
Employeesdf=spark.read.option('Header',True).option('InferSchema',True).csv('/spark_sample/Employees.txt')

 b)Perform common DataFrame operations such as filtering, grouping, or joining.

In [None]:
#Joining operation:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder.appName("Dataframe_practice").getOrCreate()
Employeesdf=spark.read.option('Header',True).option('InferSchema',True).csv('/spark_sample/Employees.txt')
Departmentdf=spark.read.option('Header',True).option('InferSchema',True).csv('/spark_sample/Department.txt')
Employeesdf.join(Departmentdf,Employeesdf.DEPARTMENT_ID == Departmentdf.DEPARTMENT_ID,"inner").select(Employeesdf.EMPLOYEE_ID,Employeesdf.FIRST_NAME,Departmentdf.DEPARTMENT_ID).show(10)


#Grouping operation:
spark=SparkSession.builder.appName("Dataframe_practice").getOrCreate()
Employeesdf=spark.read.option('Header',True).option('InferSchema',True).csv('/spark_sample/Employees.txt')
Departmentdf=spark.read.option('Header',True).option('InferSchema',True).csv('/spark_sample/Department.txt')
groupeddf=Employeesdf.groupBy(col("DEPARTMENT_ID")).agg(SUM("SALARY").alias("TOTAL_SALARY"))

#Filtering operation:
spark=SparkSession.builder.appName("Dataframe_practice").getOrCreate()
Employeesdf=spark.read.option('Header',True).option('InferSchema',True).csv('/spark_sample/Employees.txt')
Departmentdf=spark.read.option('Header',True).option('InferSchema',True).csv('/spark_sample/Department.txt')
Filterddf=Employeesdf.filter('DEPARTMENT_ID==10')

c) Apply Spark SQL queries on the DataFrame to extract insights from the data.

In [None]:
Employeedf.createOrReplaceTempView("Employees")
spark.sql('Select Employee_name,Salary from Employees where department_id=10 order by salary')

# 3.) Spark Streaming:
   a) Write a Python program to create a Spark Streaming application.

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext("local[2]", "SparkStreamingApp")
ssc = StreamingContext(sc, 1)

b) Configure the application to consume data from a streaming source (e.g., Kafka or a socket).

In [None]:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext("local[2]", "SparkStreamingApp")
ssc = StreamingContext(sc, 1)
kafka_params = {
"bootstrap.servers": "localhost:9092",  # Kafka broker(s) address
"group.id": "spark-streaming-app",  # Consumer group ID
"auto.offset.reset": "latest"  # Start reading from the latest offset
}

kafka_stream = KafkaUtils.createDirectStream(ssc, ["test-topic"], kafka_params)

c) Implement streaming transformations and actions to process and analyze the incoming data stream.

In [None]:
lines = kafka_stream.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
word_counts.pprint()
ssc.start()
ssc.awaitTermination()

# 4.) Spark SQL and Data Source Integration:
   a) Write a Python program to connect Spark with a relational database (e.g., MySQL, PostgreSQL).

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("SparkDatabaseConnection") \
.config("spark.driver.extraClassPath", "/path/to/jdbc/driver.jar") \
.getOrCreate()
database_url = "jdbc:postgresql://localhost:5432/mydatabase"
database_properties = {
          "user": "myuser",
          "password": "mypassword",
          "driver": "org.postgresql.Driver"
}
df = spark.read.jdbc(url=database_url, table="mytable", properties=database_properties)
df.show()
df.write.jdbc(url=database_url, table="newtable", mode="overwrite", properties=database_properties)

b)Perform SQL operations on the data stored in the database using Spark SQL.

In [None]:
df.createOrReplaceTempView("data")
spark.sql('select * from data)

c) Explore the integration capabilities of Spark with other data sources, such as Hadoop Distributed File System (HDFS) or Amazon S3.

## Solution: 
Spark can read data from HDFS using the spark.read API. It supports various file formats such as Parquet, Avro, JSON, CSV, and  more.
You can specify the HDFS file path when reading data, like spark.read.parquet("hdfs://localhost:9000/path/to/file.parquet").
Spark automatically parallelizes the data loading process, allowing for efficient distributed processing of large datasets.

##### Writing Data to HDFS:

Spark provides the DataFrame.write API to write data to HDFS. It supports various file formats and partitioning options.
You can save a DataFrame to HDFS using df.write.parquet("hdfs://localhost:9000/path/to/output.parquet"), specifying the desired file format.
Partitioning data while writing allows for better data organization and improves query performance.

##### Reading Data from S3:

Spark can read data from S3 using the spark.read API. It supports various file formats such as Parquet, Avro, JSON, CSV, and more.
You can specify the S3 bucket and file path when reading data, like spark.read.parquet("s3a://bucket-name/path/to/file.parquet").
Spark can automatically parallelize the data loading process, enabling distributed processing of large datasets stored in S3. 

##### Writing Data to S3:

Spark provides the DataFrame.write API to write data to S3. It supports various file formats and partitioning options.
You can save a DataFrame to S3 using df.write.parquet("s3a://bucket-name/path/to/output.parquet"), specifying the desired file format.
Partitioning data while writing allows for better data organization and improves query performance.