In [1]:
import pyspark
from pyspark.sql import SparkSession, DataFrame
from pyspark import SparkConf, SparkContext

from typing import Tuple, List

In [37]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [49]:
tourists_path = 'tourists.txt'
POIs_path = 'POIs.txt'
visits_path = 'visits.txt'

tourists_rdd = sc.textFile(tourists_path)
POIs_rdd = sc.textFile(POIs_path)
visits_rdd = sc.textFile(visits_path)

In [48]:
# ---------------------------------------------------------
# PART 1: Tourist(s) with the most visits to Italian POIs
# ---------------------------------------------------------
def poiid1(line):
    fields = line.split(",")
    poiid = fields[0]
    return (poiid, 1)

poiid1rdd = POIs_rdd.filter(lambda line: line.find(",Italy") >= 0).map(poiid1)
poiid1rdd.collect()

[('POI1', 1), ('POI2', 1), ('POI3', 1), ('POI4', 1), ('POI5', 1)]

In [5]:
def poiidcodtt(line):
    fields = line.split(",")
    poiid = fields[2]
    codt = fields[0]
    return (poiid, codt)

poiidcodt = visits_rdd.map(poiidcodtt)
poiidcodt.collect()

[('POI1', 'T1'),
 ('POI2', 'T1'),
 ('POI3', 'T1'),
 ('POI1', 'T2'),
 ('POI4', 'T2'),
 ('POI6', 'T3'),
 ('POI1', 'T3'),
 ('POI5', 'T4')]

In [47]:
# Join between visits and Italian POIs
# Format: [poiid, (codt, 1)]
poiidcodtjoinpoiid1rdd = poiidcodt.join(poiid1rdd)
poiidcodtjoinpoiid1rdd.collect()

[('POI1', ('T1', 1)),
 ('POI1', ('T2', 1)),
 ('POI1', ('T3', 1)),
 ('POI2', ('T1', 1)),
 ('POI3', ('T1', 1)),
 ('POI5', ('T4', 1)),
 ('POI4', ('T2', 1))]

In [7]:
# Map to (codt, 1)
codt1 = poiidcodtjoinpoiid1rdd.map(lambda x: (x[1][0], 1))
poiidcodtjoinpoiid1rdd.collect()

[('POI1', ('T1', 1)),
 ('POI1', ('T2', 1)),
 ('POI1', ('T3', 1)),
 ('POI2', ('T1', 1)),
 ('POI3', ('T1', 1)),
 ('POI5', ('T4', 1)),
 ('POI4', ('T2', 1))]

In [8]:
# Sum visits per tourist: (codt, sum)
codtsum = codt1.reduceByKey(lambda a, b: a + b)
poiidcodtjoinpoiid1rdd.collect()

[('POI1', ('T1', 1)),
 ('POI1', ('T2', 1)),
 ('POI1', ('T3', 1)),
 ('POI2', ('T1', 1)),
 ('POI3', ('T1', 1)),
 ('POI5', ('T4', 1)),
 ('POI4', ('T2', 1))]

In [9]:
# Calculate maximum visits
maxvisits = codtsum.values().max()
poiidcodtjoinpoiid1rdd.collect()

[('POI1', ('T1', 1)),
 ('POI1', ('T2', 1)),
 ('POI1', ('T3', 1)),
 ('POI2', ('T1', 1)),
 ('POI3', ('T1', 1)),
 ('POI5', ('T4', 1)),
 ('POI4', ('T2', 1))]

In [10]:
# Filter for tourists with maximum visits
res1 = codtsum.filter(lambda x: x[1] == maxvisits).keys()
res1.collect()

['T1']

In [None]:
# Save result 
res1.saveAsTextFile("output_path1")

In [11]:
# ---------------------------------------------------------
# PART 2: Number of distinct categories visited in Italy in 2024
# ---------------------------------------------------------
# Function to map Visits: (POIID, CodT)
def poiidcodtt2(line):
    fields = line.split(",")
    poiid = fields[2]
    codt = fields[0]
    return (poiid, codt)

# Filter visits for the year 2024 and map
poiidcodt2 = visits_rdd.filter(lambda line: line.split(",")[1].startswith("2024"))\
                       .map(poiidcodtt2)
poiidcodt2.collect()

[('POI1', 'T1'),
 ('POI2', 'T1'),
 ('POI3', 'T1'),
 ('POI1', 'T2'),
 ('POI4', 'T2'),
 ('POI6', 'T3'),
 ('POI5', 'T4')]

In [14]:
# Function to map POIs: (POIID, Category)
def poiidcategory(line):
    fields = line.split(",")
    poiid = fields[0]
    category = fields[2]
    return (poiid, category)

# Filter POIs located in Italy and map
poiidcategoryrdd = POIs_rdd.filter(lambda line: line.find(",Italy") >= 0)\
                           .map(poiidcategory)

poiidcategoryrdd.collect()

[('POI1', 'Museum'),
 ('POI2', 'Museum'),
 ('POI3', 'Restaurant'),
 ('POI4', 'Railway station'),
 ('POI5', 'Hotel')]

In [15]:
# Join POI categories with visits: (POIID, (Category, CodT))
poidcategorycodtrdd = poiidcategoryrdd.join(poiidcodt2)
poidcategorycodtrdd.collect()

[('POI1', ('Museum', 'T1')),
 ('POI1', ('Museum', 'T2')),
 ('POI2', ('Museum', 'T1')),
 ('POI3', ('Restaurant', 'T1')),
 ('POI5', ('Hotel', 'T4')),
 ('POI4', ('Railway station', 'T2'))]

In [16]:
# Map to (CodT, Category) 
codtcategory = poidcategorycodtrdd.map(lambda x: (x[1][1], x[1][0])).distinct()
codtcategory.collect()

[('T2', 'Railway station'),
 ('T1', 'Restaurant'),
 ('T1', 'Museum'),
 ('T2', 'Museum'),
 ('T4', 'Hotel')]

In [17]:
# Count distinct categories for tourists with at least one visit
codtnumcat = codtcategory.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
codtnumcat.collect()

[('T2', 2), ('T1', 2), ('T4', 1)]

In [19]:
# Function to extract CodT from tourists file
def codt(line):
    fields = line.split(",")
    codt = fields[0]
    return (codt, 1)

# Get all tourist identifiers
allturist = tourists_rdd.map(codt).keys()
print(allturist)

PythonRDD[44] at RDD at PythonRDD.scala:56


In [40]:
# Identify tourists with zero visits in Italy in 2024
codt_with_visits = codtcategory.keys()
print(codt_with_visits)

PythonRDD[86] at RDD at PythonRDD.scala:56


In [44]:
turistzero = allturist.subtract(codt_with_visits).map(lambda x: (x, 0))
turistzero.collect()

[('T3', 0)]

In [45]:
# Union the results: (Tourists with visits) + (Tourists with zero visits)
res2 = codtnumcat.union(turistzero)
res2.collect()

[('T2', 2), ('T1', 2), ('T4', 1), ('T3', 0)]

In [46]:
# Save result to output folder
res2.saveAsTextFile("output_path2")