# Spark Tasks:

1. **Data Aggregation:**
   Read a dataset containing sales transactions. Calculate the total sales amount for each product category using Spark's `groupBy` and aggregation functions.

2. **Log Analysis:**
   Analyze server log data to find the most frequently accessed URLs and their corresponding IP addresses. Use Spark SQL to query and visualize the results.

## 1. Data Aggregation: 

#### Importing libraries:

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, date_format, desc

#### Connecting to server:

In [2]:
spark = SparkSession \
    .builder \
    .appName("Sales") \
    .getOrCreate()

### Read a dataset containing sales transactions:

In [3]:
sales_df = spark.read.csv("sales.csv",header=True,inferSchema=True)
sales_df.show()

+---------+-----------+------+
|  Product|   Category|Amount|
+---------+-----------+------+
|Product A|Electronics|   100|
|Product B|   Clothing|    50|
|Product C|Electronics|   200|
|Product D|   Clothing|    75|
|Product E|Electronics|   150|
+---------+-----------+------+



### Calculate the total sales amount for each product category using Spark's groupBy and aggregation functions:

In [4]:
total_sales = sales_df.groupBy("Category").agg(sum("Amount").alias("total_sales_amount"))

In [5]:
total_sales.show()

+-----------+------------------+
|   Category|total_sales_amount|
+-----------+------------------+
|Electronics|               450|
|   Clothing|               125|
+-----------+------------------+



## 2. Log Analysis:

### Analyze server log data to find the most frequently accessed URLs and their corresponding IP addresses. Use Spark SQL to query and visualize the results

In [6]:
server_log_df = spark.read.load("server_log.txt", format="csv", sep=" ", inferSchema="true", header="false") \
    .toDF("date","time", "page_name", "IP_address") \
    .withColumn('time', date_format('time', 'HH:mm:ss'))

server_log_df.show()

+----------+--------+---------+-------------+
|      date|    time|page_name|   IP_address|
+----------+--------+---------+-------------+
|2023-08-01|10:15:23|   /page1|192.168.1.100|
|2023-08-01|10:20:45|   /page2|192.168.1.101|
|2023-08-01|10:30:12|   /page1|192.168.1.102|
|2023-08-01|10:32:56|   /page3|192.168.1.100|
|2023-08-01|10:35:09|   /page2|192.168.1.103|
+----------+--------+---------+-------------+



In [7]:
most_popular_pages = server_log_df.groupBy("page_name").count().orderBy(desc('count'))

most_popular_pages.show()

+---------+-----+
|page_name|count|
+---------+-----+
|   /page1|    2|
|   /page2|    2|
|   /page3|    1|
+---------+-----+



In [8]:
server_log_df.createOrReplaceTempView("sales")

In [9]:
spark.sql("SELECT page_name, IP_address FROM sales ORDER BY page_name asc").show(truncate=False)

+---------+-------------+
|page_name|IP_address   |
+---------+-------------+
|/page1   |192.168.1.100|
|/page1   |192.168.1.102|
|/page2   |192.168.1.101|
|/page2   |192.168.1.103|
|/page3   |192.168.1.100|
+---------+-------------+



#### Closing connection

In [10]:
spark.stop()