### Spark Tasks:

1. **Data Aggregation:**
   Read a dataset containing sales transactions. Calculate the total sales amount for each product category using Spark's `groupBy` and aggregation functions.

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

spark = SparkSession.builder.appName('TotalSalesAmount').getOrCreate()

data = spark.read.csv('sales.csv', header=True)
data_grouped = data.groupBy('Category')
total_amount = data_grouped.agg(sum('Amount').alias('TotalAmount'))
total_amount.show()
spark.stop()

+-----------+-----------+
|   Category|TotalAmount|
+-----------+-----------+
|Electronics|      450.0|
|   Clothing|      125.0|
+-----------+-----------+



2. **Log Analysis:**
   Analyze server log data to find the most frequently accessed URLs and their corresponding IP addresses. Use Spark SQL to query and visualize the results.

In [14]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import count

spark = SparkSession.builder.appName('ServerLogAnalysis').getOrCreate()

server_log_schema = StructType([
    StructField('timestamp', StringType(), True),
    StructField('url', StringType(), True),
    StructField('ip_address', StringType(), True)
])

access_log_schema = StructType([
    StructField('timestamp', StringType(), True),
    StructField('url', StringType(), True)
])

server_log_df = spark.read.csv('server_log.txt', header=False, schema=server_log_schema, sep=' ')
access_log_df = spark.read.csv('access_log.txt', header=False, schema=access_log_schema, sep=' ')

server_log_df.createOrReplaceTempView('server_log')
access_log_df.createOrReplaceTempView('access_log')

query = """SELECT sl.ip_address, COUNT(*) AS access_count
    FROM server_log sl
    INNER JOIN access_log al ON sl.url = al.url
    GROUP BY sl.ip_address
    ORDER BY access_count DESC"""

result = spark.sql(query)

result.show()

spark.stop()

+----------+------------+
|ip_address|access_count|
+----------+------------+
|    /page1|           2|
|    /page2|           2|
|    /page3|           1|
+----------+------------+



### MapReduce Tasks:

1. **URL Access Count:**
   Given a log file containing records of URLs accessed and their corresponding timestamps, use MapReduce to count the number of times each URL was accessed within a specific time window.

In [63]:
from collections import defaultdict
from datetime import datetime, timedelta
from functools import reduce

start_time = datetime.strptime('2023-08-01 10:00:00', '%Y-%m-%d %H:%M:%S')
time_window_minutes = 60
end_time = start_time + timedelta(minutes=time_window_minutes)

def mapper(log_entry):
    components = log_entry.split()
    timestamp = datetime.strptime(components[0] + ' ' + components[1], '%Y-%m-%d %H:%M:%S')
    url = components[2]
    ip = components[3] if len(components) > 3 else None
    return url, (timestamp, ip)

def reducer(result_dict, value):
    url, values = value
    access_count = 0
    for timestamp, ip in values:
        if start_time <= timestamp <= end_time:
            access_count += 1
    result_dict[url] = access_count
    return result_dict

log_data_file = 'server_log.txt'
with open(log_data_file, 'r') as file:
    log_data = file.readlines()

mapped_data = defaultdict(list)
for entry in log_data:
    url, value = mapper(entry)
    mapped_data[url].append(value)

reduced_data = reduce(reducer, mapped_data.items(), {})

for url, count in reduced_data.items():
    print(f'url: {url}, Access Count: {count}')


url: /page1, Access Count: 2
url: /page2, Access Count: 2
url: /page3, Access Count: 1


2. **Follower Recommendations:**
   Given a dataset representing a social network's following graph. Use MapReduce to recommend the users to follow for another users who do have a mutual followers,
but do not follow each other.

In [67]:
from collections import defaultdict
from functools import reduce

with open('follower_graph.txt', 'r') as file:
    follower_data = file.readlines()

def mapper(line):
    parts = line.split()
    user = parts[0]
    followers = parts[1:]
    return [((follower1, follower2), user) for i, follower1 in enumerate(followers)
                for j, follower2 in enumerate(followers) if i != j]

def reducer(recommendations, follower_pair_users):
    follower_pair, user = follower_pair_users
    user1, user2 = follower_pair
    if (user2, user1) not in follower_data:
        recommendations[user1].append(user2)
    return recommendations

recommendations = reduce(reducer, (pair_user for line in follower_data
                    for pair_user in mapper(line)), defaultdict(list))

for user, rec_list in recommendations.items():
    recommendations_str = ', '.join(rec_list)
    print(f'User {user} can follow users: {recommendations_str}')


User 61 can follow users: 865, 999, 382, 18, 578, 828, 665, 502, 915, 790, 530, 670, 334, 545, 631, 660, 926, 659, 417, 647, 782, 368, 741, 555, 796, 236, 591, 897, 24, 935, 692, 620, 910, 347, 929, 740, 867, 472, 57, 197, 675, 624, 992, 218, 995, 202, 339, 638, 102, 927, 176, 269, 344, 565, 557, 719, 11, 960, 722, 721, 984, 382, 894, 334, 323, 681, 558, 457, 862, 447, 962, 223, 610, 985, 803, 425, 979, 842, 766, 702, 553, 344, 816, 825, 331, 749, 388, 637, 483, 192, 858, 269, 108, 687, 297, 927, 396, 304, 141, 532, 338, 945, 46, 767, 976, 960, 181, 615, 200, 284, 655, 127, 776, 156, 404, 905, 665, 417, 239, 381, 918, 535, 981, 406, 359, 955, 783, 50, 338, 189, 710, 153, 807, 577, 683, 694, 356, 290, 72, 45, 831, 302, 852, 587, 720, 922, 343, 617, 4, 264, 828, 588, 507, 721, 797, 81, 482, 8, 152, 228, 612, 554, 80, 245, 622, 412, 440, 708, 654, 191, 224, 983, 828, 164, 786, 5, 797, 93, 867, 826, 644, 388, 956, 386, 706, 945, 747, 159, 598, 960, 997, 521, 552, 437, 672, 186, 535, 26, 48