### Spark Tasks:

1. **Data Aggregation:**
   Read a dataset containing sales transactions. Calculate the total sales amount for each product category using Spark's `groupBy` and aggregation functions.

In [90]:
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum
from pyspark.sql.functions import col, window, expr, date_format
from pyspark.sql  import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

In [91]:
spark = SparkSession.builder.appName("TotalSales").getOrCreate()

In [92]:
file = r"C:\Users\krent\OneDrive\Desktop\PythonII\PythonIIPractical\week11\sales.csv"

In [93]:
data = spark.read.csv(file,header=True,inferSchema=True)

In [94]:
data

DataFrame[Product: string, Category: string, Amount: int]

In [95]:
total_sales = data.groupBy("Category").agg(sum("Amount").alias("Total_Sales"))

In [96]:
total_sales.show()

+-----------+-----------+
|   Category|Total_Sales|
+-----------+-----------+
|Electronics|        450|
|   Clothing|        125|
+-----------+-----------+



In [97]:
spark.stop()

2. **Log Analysis:**
   Analyze server log data to find the most frequently accessed URLs and their corresponding IP addresses. Use Spark SQL to query and visualize the results.

In [98]:
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()

In [99]:
df = spark.read.text(r"C:\Users\krent\OneDrive\Desktop\PythonII\PythonIIPractical\week11\server_log.txt").selectExpr("split(value, ' ')[2] as url", "split(value, ' ')[3] as ip_address")

In [100]:
df

DataFrame[url: string, ip_address: string]

In [101]:
ip_count = df.groupBy("url", "ip_address").count()

In [102]:
rank_url_count = ip_count.withColumn("Access rank", F.dense_rank().over(Window.orderBy(F.desc("count"))))

In [103]:
result = rank_url_count.select(
    F.col("url").alias("URL"),
          F.col("ip_address").alias("IP Address"),
          F.col("Access Rank"),
          F.col("count").alias("Access Count")
         )
result.show()

+------+-------------+-----------+------------+
|   URL|   IP Address|Access Rank|Access Count|
+------+-------------+-----------+------------+
|/page1|192.168.1.102|          1|           1|
|/page2|192.168.1.101|          1|           1|
|/page2|192.168.1.103|          1|           1|
|/page3|192.168.1.100|          1|           1|
|/page1|192.168.1.100|          1|           1|
+------+-------------+-----------+------------+



In [104]:
spark.stop()

### MapReduce Tasks:

1. **URL Access Count:**
   Given a log file containing records of URLs accessed and their corresponding timestamps, use MapReduce to count the number of times each URL was accessed within a specific time window.


In [105]:
spark = SparkSession.builder.appName("UrlAccessCount").getOrCreate()

In [106]:
data = spark.read.text(r"C:\Users\krent\OneDrive\Desktop\PythonII\PythonIIPractical\week11\access_log.txt")

In [107]:
df = data.selectExpr("split(value, ' ')[0] as date_time", "split(value, ' ')[2] as url")

In [108]:
df

DataFrame[date_time: string, url: string]

In [109]:
time = "2 hours"
result = df.groupBy("url", window("date_time", time)).count()

In [110]:
modified_result = result.select(
    col("url").alias("URL"),
    date_format(col("window.start"), "yyyy-mm-dd").alias("Date"),
    col("count").alias("Access Count")
)

In [111]:
modified_result.show()

+------+----------+------------+
|   URL|      Date|Access Count|
+------+----------+------------+
|/page1|2023-00-31|           2|
|/page3|2023-00-31|           1|
|/page2|2023-00-31|           2|
+------+----------+------------+



In [112]:
spark.stop()

2. **Follower Recommendations:**
   Given a dataset representing a social network's following graph. Use MapReduce to recommend the users to follow for another users who do have a mutual followers,
but do not follow each other.

Example:

   a follows b
   b follows d.
In this case, recommend a to follow d ---> NOT FINISHED

In [113]:
conf = SparkConf().setMaster("local").setAppName("FollowerRecommendations")

In [114]:
sc = SparkContext(conf=conf)

In [116]:
data = sc.textFile(r"C:\Users\krent\OneDrive\Desktop\PythonII\PythonIIPractical\week11\follower_graph.txt")

In [117]:
pairs = data.map(lambda line: line.split()).map(lambda parts: (parts[0], parts[1:]))