In [19]:
from collections import defaultdict

# Sample log data
logs = [
    {"UserId": 111115, "TrackId": 222, "Shared": 0, "Radio": 1, "Skip": 0},
    {"UserId": 111113, "TrackId": 225, "Shared": 1, "Radio": 0, "Skip": 0},
    {"UserId": 111117, "TrackId": 223, "Shared": 0, "Radio": 1, "Skip": 1},
    {"UserId": 111115, "TrackId": 225, "Shared": 1, "Radio": 0, "Skip": 0},
]

# -------- MAP Step --------
mapped = []

for log in logs:
    # Emit unique listener (UserId as key, 1 as value)
    mapped.append(("unique_listener", log["UserId"]))

    # Emit counts
    mapped.append(("shared", log["Shared"]))
    mapped.append(("radio", log["Radio"]))
    mapped.append(("listened_total", 1))   # each row = 1 listen
    mapped.append(("skipped", log["Skip"]))

print("Mapped Output (sample):")
print(mapped[:10])

# -------- SHUFFLE Step --------
shuffled = defaultdict(list)
for key, value in mapped:
    shuffled[key].append(value)

print("\nShuffled Output:")
for k, v in shuffled.items():
    print(k, ":", v)

# -------- REDUCE Step --------
results = {}

for key, values in shuffled.items():
    if key == "unique_listener":
        results[key] = len(set(values))  # count distinct UserIds
    else:
        results[key] = sum(values)       # sum for other fields

print("\nFinal Results:")
for k, v in results.items():
    print(f"{k}: {v}")


Mapped Output (sample):
[('unique_listener', 111115), ('shared', 0), ('radio', 1), ('listened_total', 1), ('skipped', 0), ('unique_listener', 111113), ('shared', 1), ('radio', 0), ('listened_total', 1), ('skipped', 0)]

Shuffled Output:
unique_listener : [111115, 111113, 111117, 111115]
shared : [0, 1, 0, 1]
radio : [1, 0, 1, 0]
listened_total : [1, 1, 1, 1]
skipped : [0, 0, 1, 0]

Final Results:
unique_listener: 3
shared: 2
radio: 2
listened_total: 4
skipped: 1
