In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from prettytable import PrettyTable
from datetime import datetime, timedelta

In [14]:
sc = SparkContext(appName="FakeRidesAnalysis")
spark = SparkSession.builder.getOrCreate()
FP = "fake_rides.csv"

header = sc.textFile(FP).first()
rdd = sc.textFile(FP).filter(lambda line: line != header).map(
    lambda line: line.split(','))

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [3]:
# Task 0: Check users and drivers count

users_count = rdd.map(lambda x: (x[1], 1)).distinct().count()
drivers_count = rdd.map(lambda x: (x[0], 1)).distinct().count()

counts_table = PrettyTable(["Category", "Count"])
counts_table.add_row(["Users", users_count])
counts_table.add_row(["Drivers", drivers_count])

print(counts_table)



+----------+-------+
| Category | Count |
+----------+-------+
|  Users   | 29891 |
| Drivers  |  4925 |
+----------+-------+




In [4]:
# Task 1: View 100 drivers with the highest average score

driver_scores = rdd.map(lambda x: (x[0], (float(x[10]), 1)))
avg_scores = driver_scores.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                          .map(lambda x: (x[0], x[1][0] / x[1][1]))
top_100_drivers = avg_scores.takeOrdered(100, key=lambda x: -x[1])

top_drivers_table = PrettyTable(["Driver ID", "Average Score"])
for driver in top_100_drivers:
    top_drivers_table.add_row([driver[0], round(driver[1], 2)])

print(top_drivers_table)



+-----------+---------------+
| Driver ID | Average Score |
+-----------+---------------+
|    2452   |      4.13     |
|    1325   |      4.13     |
|    575    |      4.13     |
|    3578   |      4.13     |
|    3323   |      4.13     |
|    226    |      4.13     |
|    3639   |      4.13     |
|    4529   |      4.13     |
|    1489   |      4.13     |
|    1538   |      4.13     |
|    3722   |      4.12     |
|    2566   |      4.12     |
|    4829   |      4.12     |
|    4321   |      4.12     |
|    3241   |      4.12     |
|    2393   |      4.12     |
|    3629   |      4.12     |
|    780    |      4.12     |
|    3048   |      4.12     |
|    2674   |      4.12     |
|    2567   |      4.12     |
|    2805   |      4.12     |
|    4441   |      4.12     |
|    3652   |      4.12     |
|    4890   |      4.12     |
|    1109   |      4.12     |
|    2528   |      4.12     |
|    1215   |      4.12     |
|    2135   |      4.12     |
|    2999   |      4.12     |
|    1767 



In [5]:
# Task 2: View drivers with average score below 3.5

low_score_drivers = avg_scores.filter(lambda x: x[1] < 3.5)

low_score_table = PrettyTable(["Drivers with avg score < 3.5", "Score"])
for driver in low_score_drivers.collect():
    low_score_table.add_row([driver[0], round(driver[1], 2)])

print(low_score_table)



+------------------------------+-------+
| Drivers with avg score < 3.5 | Score |
+------------------------------+-------+
+------------------------------+-------+


                                                                                

In [6]:
# Task 3: Check which timeframe has most daily rides

start_date_str = rdd.map(lambda x: x[7]).min()
end_date_str = rdd.map(lambda x: x[8]).max()

start_date = datetime.strptime(start_date_str, '%Y-%m-%d %H:%M:%S').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d %H:%M:%S').date()

total_days = (end_date - start_date).days + 1
hourly_rides = rdd.map(lambda x: (
    datetime.strptime(x[7], '%Y-%m-%d %H:%M:%S').hour, 1))
hourly_rides_count = hourly_rides.reduceByKey(lambda x, y: x + y)
hourly_average_rides = hourly_rides_count.map(lambda x: (
    x[0], round(x[1] / total_days, 2))).sortBy(lambda x: x[1], ascending=False)

timeframes_table = PrettyTable(["Hour", "Average Rides"])
for row in hourly_average_rides.collect():
    timeframes_table.add_row(row)

print(timeframes_table)



+------+---------------+
| Hour | Average Rides |
+------+---------------+
|  17  |    10950.36   |
|  8   |    9123.42    |
|  18  |    9122.27    |
|  19  |    8209.13    |
|  16  |    7298.52    |
|  9   |    6388.72    |
|  20  |    5474.64    |
|  10  |    4564.44    |
|  15  |    4560.99    |
|  21  |    3651.74    |
|  0   |     2738.2    |
|  14  |     2737.1    |
|  13  |    2736.93    |
|  7   |    2736.41    |
|  11  |    1825.17    |
|  1   |     1825.1    |
|  12  |    1824.93    |
|  2   |     912.87    |
|  22  |     912.46    |
|  6   |     912.22    |
|  3   |     911.89    |
|  23  |     911.5     |
|  4   |     455.97    |
|  5   |     455.92    |
+------+---------------+




In [7]:
# Task 4: View 100 users with the highest average feedback

user_scores = rdd.map(lambda x: (x[1], (float(x[13]), 1)))
avg_scores = user_scores.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
    .map(lambda x: (x[0], x[1][0] / x[1][1]))
top_100_users = avg_scores.takeOrdered(100, key=lambda x: -x[1])

top_users_table = PrettyTable(["User ID", "Average Feedback"])
for user in top_100_users:
    top_users_table.add_row([user[0], round(user[1], 2)])

print(top_users_table)

                                                                                

+---------+------------------+
| User ID | Average Feedback |
+---------+------------------+
|  13724  |       3.88       |
|  21081  |       3.87       |
|  13070  |       3.87       |
|  29762  |       3.87       |
|   3596  |       3.87       |
|  17181  |       3.87       |
|   2634  |       3.87       |
|  27235  |       3.87       |
|  14684  |       3.87       |
|  13972  |       3.87       |
|  27564  |       3.87       |
|  11223  |       3.87       |
|   7583  |       3.87       |
|  17428  |       3.87       |
|   7665  |       3.86       |
|   1799  |       3.86       |
|  11813  |       3.86       |
|  29643  |       3.86       |
|  10696  |       3.86       |
|   2508  |       3.86       |
|  10557  |       3.86       |
|  26499  |       3.86       |
|  14594  |       3.86       |
|  25436  |       3.86       |
|  19481  |       3.86       |
|  12510  |       3.86       |
|  21616  |       3.86       |
|  18435  |       3.86       |
|  12977  |       3.86       |
|  11940

In [8]:
# Task 5: View 100 drivers with the most earnings

driver_trip_cost = rdd.map(lambda x: (x[0], float(x[9])))
total_trip_cost_by_driver = driver_trip_cost.reduceByKey(lambda x, y: x + y)
top_100_drivers_by_trip_cost = total_trip_cost_by_driver.takeOrdered(
    100, key=lambda x: -x[1])

top_earners_table = PrettyTable(["Driver ID", "Total Trip Cost"])
for driver in top_100_drivers_by_trip_cost:
    top_earners_table.add_row([driver[0], round(driver[1], 2)])

print(top_earners_table)



+-----------+-----------------+
| Driver ID | Total Trip Cost |
+-----------+-----------------+
|    3409   |    301245.94    |
|    4212   |    300800.71    |
|    4033   |    300472.79    |
|    4070   |    300465.92    |
|    848    |    300437.23    |
|    2247   |    300304.62    |
|    801    |    300207.48    |
|    2097   |    300200.04    |
|    465    |    300146.04    |
|    2157   |    300019.37    |
|    2642   |    300011.49    |
|    291    |    299934.03    |
|    1393   |    299912.97    |
|    688    |    299495.23    |
|    3529   |    299401.09    |
|    1872   |     299392.3    |
|    808    |    299358.46    |
|    1722   |    299347.24    |
|    4420   |    299272.27    |
|    1058   |    299268.11    |
|    766    |    299249.68    |
|    2478   |    299194.85    |
|    1388   |    299173.63    |
|    1927   |    299121.44    |
|     9     |    299064.08    |
|    4726   |    298975.02    |
|    3625   |    298943.22    |
|    2764   |     298935.8    |
|    176



In [9]:
# Task 6: View 50 drivers with the most number of night rides

def is_night_ride(start_time_str):
    start_time = datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S')
    return 0 <= start_time.hour <= 5


night_rides = rdd.filter(lambda x: is_night_ride(x[7]))
night_rides_count = night_rides.map(
    lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)
top_night_drivers = night_rides_count.sortBy(
    lambda x: x[1], ascending=False).take(50)

night_rides_table = PrettyTable(["Driver ID", "Night Rides"])
for row in top_night_drivers:
    night_rides_table.add_row(row)

print(night_rides_table)

                                                                                

+-----------+-------------+
| Driver ID | Night Rides |
+-----------+-------------+
|    3411   |     1777    |
|    2273   |     1767    |
|    1688   |     1767    |
|    1172   |     1758    |
|    4578   |     1757    |
|    1252   |     1753    |
|    1927   |     1750    |
|    4902   |     1746    |
|    3766   |     1742    |
|    4212   |     1742    |
|    1039   |     1741    |
|    1367   |     1741    |
|    4609   |     1740    |
|    573    |     1740    |
|    2153   |     1739    |
|    715    |     1739    |
|    143    |     1738    |
|    797    |     1737    |
|    3947   |     1737    |
|    345    |     1737    |
|    1049   |     1735    |
|    4026   |     1735    |
|    4348   |     1734    |
|    1485   |     1734    |
|    269    |     1734    |
|    4751   |     1733    |
|    3140   |     1732    |
|    1353   |     1729    |
|    3216   |     1729    |
|    3218   |     1728    |
|    2393   |     1726    |
|    2097   |     1726    |
|    4161   |     17

In [10]:
# Task 7: View most popular positive feedback for drivers

positive_feedback_notes = rdd.filter(
    lambda x: int(x[10]) in [4, 5]).map(lambda x: (x[11], 1))
frequency_of_notes = positive_feedback_notes.reduceByKey(lambda x, y: x + y)
most_frequent_notes = frequency_of_notes.takeOrdered(1, key=lambda x: -x[1])

most_positive_table = PrettyTable(["Most Seen Feedback", "Frequency"])
for note in most_frequent_notes:
    most_positive_table.add_row([note[0], note[1]])

print(most_positive_table)



+--------------------+-----------+
| Most Seen Feedback | Frequency |
+--------------------+-----------+
|        fast        |  30002730 |
+--------------------+-----------+




In [11]:
# Task 8: View most popular negative feedback for user

low_score_user_notes = rdd.filter(lambda x: x[14] and int(x[13]) in [
                                  1, 2]).map(lambda x: (x[14], 1))
frequency_of_user_notes = low_score_user_notes.reduceByKey(lambda x, y: x + y)
sorted_user_notes = frequency_of_user_notes.takeOrdered(1, key=lambda x: -x[1])

worst_clients_table = PrettyTable(["User Notes", "Frequency"])
for note in sorted_user_notes:
    worst_clients_table.add_row([note[0], note[1]])

print(worst_clients_table)



+------------+-----------+
| User Notes | Frequency |
+------------+-----------+
|    rude    |  4001889  |
+------------+-----------+




In [12]:
# Task 9: View longest driver feedbacks

feedback_lengths = rdd.filter(lambda x: x[12]).map(
    lambda x: (x[12], len(x[12])))
top_10_feedback_lengths = feedback_lengths.takeOrdered(10, key=lambda x: -x[1])

longest_feedbacks_table = PrettyTable(["Driver Feedback", "Length"])
for feedback in top_10_feedback_lengths:
    longest_feedbacks_table.add_row([feedback[0], feedback[1]])

print(longest_feedbacks_table)



+------------------------------------------------------------------------------------------+--------+
|                                     Driver Feedback                                      | Length |
+------------------------------------------------------------------------------------------+--------+
| Interesting style administration international organization difference local especially. |   88   |
|  Relationship senior imagine participant picture understand responsibility performance.  |   86   |
|   Teach administration environment Congress particularly certain environmental dinner.   |   84   |
|   Yourself example institution important organization television hair responsibility.    |   83   |
|   Contain lead although professor Republican responsibility environmental throughout.    |   83   |
|   Republican follow standard responsibility together important international tonight.    |   83   |
|    Government nearly particularly organization development according set institu



In [15]:
sc.stop()