In [1]:
from time import time
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import types as T, functions as F

import pandas as pd

### Settings

In [2]:
LONDON_DIR = "./lab/London postcodes.csv"
NAMES = ["James", "Mary", "Robert", "Patricia", "John", "Jennifer", "Michael", "Linda", "William", "Elizabeth", "David", "Barbara", "Richard", "Susan", "Joseph", "Jessica", "Thomas", "Sarah", "Charles", "Karen", "Christopher", "Nancy", "Daniel", "Lisa", "Matthew", "Betty", "Anthony", "Margaret", "Mark", "Sandra", "Donald", "Ashley", "Steven", "Kimberly", "Paul", "Emily", "Andrew", "Donna", "Joshua", "Michelle", "Kenneth", "Dorothy", "Kevin", "Carol", "Brian", "Amanda", "George", "Melissa", "Edward", "Deborah", "Ronald", "Stephanie", "Timothy", "Rebecca", "Jason", "Sharon", "Jeffrey", "Laura", "Ryan", "Cynthia", "Jacob", "Kathleen", "Gary", "Amy", "Nicholas", "Shirley", "Eric", "Angela", "Jonathan", "Helen", "Stephen", "Anna", "Larry", "Brenda", "Justin", "Pamela", "Scott", "Nicole", "Brandon", "Emma", "Benjamin", "Samantha", "Samuel", "Katherine", "Gregory", "Christine", "Frank", "Debra", "Alexander", "Rachel", "Raymond", "Catherine", "Patrick", "Carolyn", "Jack", "Janet", "Dennis", "Ruth", "Jerry", "Maria", "Tyler", "Heather", "Aaron", "Diane", "Jose", "Virginia", "Adam", "Julie", "Henry", "Joyce", "Nathan", "Victoria", "Douglas", "Olivia", "Zachary", "Kelly", "Peter", "Christina", "Kyle", "Lauren", "Walter", "Joan", "Ethan", "Evelyn", "Jeremy", "Judith", "Harold", "Megan", "Keith", "Cheryl", "Christian", "Andrea", "Roger", "Hannah", "Noah", "Martha", "Gerald", "Jacqueline", "Carl", "Frances", "Terry", "Gloria"]
NUM_POINTS = 100
NUM_NAMES = 100

### Spark session initializing

In [3]:
spark = SparkSession.builder.getOrCreate()

21/11/29 16:29:32 WARN Utils: Your hostname, dragula-idepad resolves to a loopback address: 127.0.1.1; using 192.168.1.194 instead (on interface wlp2s0)
21/11/29 16:29:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/29 16:29:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Loading starting dataframe

In [4]:
schema = T.StructType([T.StructField("Postcode", T.StringType(), True),
    T.StructField("In Use?", T.StringType(), True),
    T.StructField("Latitude", T.DoubleType(), True),
    T.StructField("Longitude", T.DoubleType(), True),
    T.StructField("Easting", T.IntegerType(), True),
    T.StructField("Northing", T.IntegerType(), True),
    T.StructField("GridRef", T.StringType(), True),
    T.StructField("CountryGB", T.StringType(), True),
    T.StructField("District", T.StringType(), True),
    T.StructField("Ward", T.StringType(), True),
    T.StructField("DistrictCode", T.StringType(), True),
    T.StructField("WardCode", T.StringType(), True),
    T.StructField("Country", T.StringType(), True),
    T.StructField("CountyCode", T.StringType(), True),
    T.StructField("Constituency", T.StringType(), True),
    T.StructField("Introduced", T.StringType(), True),
    T.StructField("Terminated", T.StringType(), True),
    T.StructField("Parish", T.StringType(), True),
    T.StructField("NationalPark", T.StringType(), True),
    T.StructField("Population", T.StringType(), True),
    T.StructField("Households", T.StringType(), True),
    T.StructField("Built up area", T.StringType(), True),
    T.StructField("Built up sub-division", T.StringType(), True),
    T.StructField("Lower layer super output area", T.StringType(), True),
    T.StructField("Rural/urban", T.StringType(), True),
    T.StructField("Region", T.StringType(), True),
    T.StructField("Altitude", T.StringType(), True),
    T.StructField("London zone", T.StringType(), True),
    T.StructField("LSOA Code", T.StringType(), True)])

pandas_df = pd.read_csv(LONDON_DIR)

df_london = spark.createDataFrame(pandas_df, schema)

  exec(code_obj, self.user_global_ns, self.user_ns)


### Data preprocessing

In [5]:
names_df = spark.createDataFrame([(name,) for name in (NAMES[:NUM_NAMES])], ["name"])
drivers_df = names_df.filter(F.monotonically_increasing_id()%2 == 0).withColumnRenamed("name", "driver")
clients_df = names_df.filter(F.monotonically_increasing_id()%2 == 1).withColumnRenamed("name", "client")

points_df = df_london.select(F.col("Latitude").alias("lat"), F.col("Longitude").alias("long")).distinct()

start_points = points_df.withColumnRenamed("lat", "start_lat").withColumnRenamed("long", "start_long")
end_points = points_df.withColumnRenamed("lat", "end_lat").withColumnRenamed("long", "end_long")
start_end_points = start_points.crossJoin(end_points).filter(F.monotonically_increasing_id() < NUM_POINTS)

rides_df = drivers_df.crossJoin(clients_df).crossJoin(start_end_points)
price_col = 1.5*((F.col("start_lat") - F.col("end_lat"))**2 + (F.col("start_long") - F.col("end_long"))**2)**0.5

start_time = (time() - F.rand()*31536000).cast(T.IntegerType())
end_time = (F.col("start_time") + F.rand()*7200).cast(T.IntegerType())
final_rides_df = rides_df \
    .withColumn("price", price_col) \
    .withColumn("start_time", start_time) \
    .withColumn("end_time", F.to_timestamp(end_time)) \
    .withColumn("start_time", F.to_timestamp(start_time)) \
    .withColumn("grade", (F.rand()*5).cast(T.IntegerType())) \
    .withColumn("to_client_grade", (F.rand()*5).cast(T.IntegerType())) \
    .withColumn("comment_type", (F.rand()*2).cast(T.IntegerType())) \
    .withColumn("comment_type", F.when(F.col("comment_type") == 0, "compliant")
               .otherwise("compliment"))

### Data example

In [7]:
# final_rides_df.show()

### 1. Top 100 drivers

In [8]:
top_100_d = final_rides_df.groupBy("driver").agg(F.avg("grade").alias("avg_grade")).orderBy(F.desc("avg_grade"))
top_100_d.show(100)

21/11/29 16:31:54 WARN TaskSetManager: Stage 8 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.

+-----------+------------------+
|     driver|         avg_grade|
+-----------+------------------+
|     Nicole|          2.035625|
|   Nicholas| 2.035416666666667|
|    Charles|             2.035|
|   Kimberly|2.0347916666666666|
|   Margaret|2.0345833333333334|
|  Catherine|2.0339583333333335|
|      Donna|2.0327083333333333|
|      James|             2.025|
|     Amanda|2.0239583333333333|
|    Matthew|2.0227083333333336|
|     Joseph| 2.020208333333333|
|      Carol|          2.019375|
|  Christine|2.0191666666666666|
|     Robert| 2.017708333333333|
|      Maria|           2.01625|
|      Janet|          2.015625|
|       Emma|2.0145833333333334|
|      Betty|2.0129166666666665|
|       Eric|           2.01125|
|       Gary|              2.01|
|      Larry|              2.01|
|   Jonathan|           2.00875|
|     Ronald|           2.00875|
|     Justin|2.0085416666666664|
|      Jason|          2.004375|
|    Stephen|          2.004375|
|    Deborah|2.0039583333333333|
|       Ry

                                                                                

### 2. Bad drivers

In [9]:
bad_dudes = final_rides_df.groupBy("driver").agg(F.avg("grade").alias("avg_grade")).filter(F.col("avg_grade") < 3.5)
bad_dudes.show()

21/11/29 16:36:32 WARN TaskSetManager: Stage 16 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.

+-----------+------------------+
|     driver|         avg_grade|
+-----------+------------------+
|      James|             2.025|
|    Matthew|2.0227083333333336|
|Christopher|1.9747916666666667|
|    Michael|1.9889583333333334|
|    Charles|             2.035|
|       John|           2.00125|
|    Richard|1.9958333333333333|
|     Joseph| 2.020208333333333|
|     Daniel|1.9958333333333333|
|      David|          1.998125|
|     Thomas|1.9889583333333334|
|     Robert| 2.017708333333333|
|    William|2.0004166666666667|
|     Amanda|2.0239583333333333|
|      Carol|          2.019375|
|      Donna|2.0327083333333333|
|    Melissa|1.9902083333333334|
|    Deborah|2.0039583333333333|
|   Michelle|1.9904166666666667|
|   Margaret|2.0345833333333334|
+-----------+------------------+
only showing top 20 rows



                                                                                

### 3. Most riding time

In [11]:
most_riding = final_rides_df.withColumn("date", F.to_date("start_time")) \
    .withColumn("time_str", F.date_format('start_time', 'HH:mm:ss')) \
    .withColumn("time_by_0_ts", F.expr("unix_timestamp(CONCAT('01-01-1970', time_str), 'MM-dd-yyyyHH:mm:ss')")).select(
    F.date_format(F.to_timestamp(F.avg("time_by_0_ts")), 'HH:mm:ss').alias("avg_time"))

In [12]:
most_riding.select("avg_time").show()

21/11/29 16:40:53 WARN TaskSetManager: Stage 24 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.

+--------+
|avg_time|
+--------+
|12:00:37|
+--------+



                                                                                

### 4. Top 50 clients

In [14]:
top50_clients = final_rides_df.groupBy("client").avg("to_client_grade").orderBy(F.desc("avg(to_client_grade)"))

In [15]:
top50_clients.show()

21/11/29 16:45:39 WARN TaskSetManager: Stage 32 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.

+---------+--------------------+
|   client|avg(to_client_grade)|
+---------+--------------------+
|  Rebecca|   2.036153846153846|
|   George|  2.0332692307692306|
|   Andrew|   2.026346153846154|
|  Gregory|   2.026153846153846|
| Benjamin|  2.0246153846153847|
| Patricia|  2.0215384615384617|
|    Helen|              2.0175|
|   Sharon|  2.0171153846153844|
|     Mark|   2.016346153846154|
|  Jessica|   2.015769230769231|
|   Dennis|  2.0151923076923075|
|     Mary|  2.0136538461538462|
|   Donald|  2.0136538461538462|
|    Frank|  2.0103846153846154|
|  Cynthia|  2.0073076923076925|
|Elizabeth|   2.006346153846154|
|  Brandon|   2.006346153846154|
|    Sarah|   2.005576923076923|
|Stephanie|  2.0048076923076925|
|   Joshua|   2.004423076923077|
+---------+--------------------+
only showing top 20 rows



                                                                                

### 5. Top 5 money makers

In [17]:
mn_mks = final_rides_df.groupBy("driver").agg(F.sum("price").alias("sum")).orderBy(F.desc("sum"))

In [18]:
mn_mks.show()

21/11/29 16:49:53 WARN TaskSetManager: Stage 40 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.

+-----------+-----------------+
|     driver|              sum|
+-----------+-----------------+
|      James|943.1574232488217|
|    Matthew|943.1574232488217|
|Christopher|943.1574232488217|
|    Michael|943.1574232488217|
|    Charles|943.1574232488217|
|       John|943.1574232488217|
|    Richard|943.1574232488217|
|     Joseph|943.1574232488217|
|     Daniel|943.1574232488217|
|      David|943.1574232488217|
|     Thomas|943.1574232488217|
|     Robert|943.1574232488217|
|    William|943.1574232488217|
|     Amanda|943.1574232488217|
|      Carol|943.1574232488217|
|      Donna|943.1574232488217|
|    Melissa|943.1574232488217|
|    Deborah|943.1574232488217|
|   Michelle|943.1574232488217|
|   Margaret|943.1574232488217|
+-----------+-----------------+
only showing top 20 rows



                                                                                

### 6. Night

In [24]:
bat_guys = final_rides_df.withColumn("date", F.to_date("start_time")) \
    .withColumn("time_str", F.date_format('start_time', 'HH:mm:ss')) \
    .withColumn("time_by_0_ts", F.expr("unix_timestamp(CONCAT('01-01-1970', time_str), 'MM-dd-yyyyHH:mm:ss')")) \
    .groupBy("driver").agg(F.avg("time_by_0_ts").alias("time_by_0_ts")) \
    .filter(F.col("time_by_0_ts").cast(T.IntegerType()) > 72e3) \
    .select("driver", F.date_format(F.to_timestamp("time_by_0_ts"), 'HH:mm:ss').alias("avg time"))

In [25]:
bat_guys.show()

21/11/29 18:45:16 WARN TaskSetManager: Stage 54 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.

+------+--------+
|driver|avg time|
+------+--------+
+------+--------+



                                                                                

### 7. Good guys

In [31]:
best_guys = final_rides_df.filter(F.col("comment_type") == "compliment").groupBy("driver").agg(F.count("comment_type").alias("compliment_num"))

In [34]:
best_guys.orderBy(F.desc("compliment_num")).show()

21/11/29 19:08:34 WARN TaskSetManager: Stage 86 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.

+---------+--------------+
|   driver|compliment_num|
+---------+--------------+
| Margaret|          2487|
|    Debra|          2487|
| Kimberly|          2470|
|  Richard|          2461|
|    Jason|          2448|
|     Emma|          2440|
|     Eric|          2439|
|   Daniel|          2438|
|Catherine|          2438|
| Jonathan|          2434|
|  Timothy|          2429|
|   Ashley|          2429|
| Nicholas|          2424|
| Samantha|          2417|
|    Betty|          2417|
|   Pamela|          2416|
|     John|          2415|
|  Jeffrey|          2411|
|   Ronald|          2410|
|Katherine|          2409|
+---------+--------------+
only showing top 20 rows



                                                                                

### 8. Bad bad guys

In [32]:

jerks = final_rides_df.filter(F.col("comment_type") != "compliment").groupBy("driver").agg(F.count("comment_type").alias("complient_num"))

In [33]:
jerks.show()

21/11/29 19:03:31 WARN TaskSetManager: Stage 78 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.

+-----------+-------------+
|     driver|complient_num|
+-----------+-------------+
|      James|         2443|
|    Matthew|         2394|
|Christopher|         2407|
|    Michael|         2446|
|    Charles|         2452|
|       John|         2385|
|    Richard|         2339|
|     Joseph|         2407|
|     Daniel|         2362|
|      David|         2411|
|     Thomas|         2441|
|     Robert|         2424|
|    William|         2416|
|     Amanda|         2423|
|      Carol|         2420|
|      Donna|         2415|
|    Melissa|         2434|
|    Deborah|         2396|
|   Michelle|         2392|
|   Margaret|         2313|
+-----------+-------------+
only showing top 20 rows



                                                                                

### 9. Longest names

In [39]:
longest = final_rides_df.select("driver", F.length("driver").alias("length")).distinct().orderBy(F.desc("length"))

In [40]:
longest.show()

21/11/29 19:40:26 WARN TaskSetManager: Stage 100 contains a task of very large size (11334 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-----------+------+
|     driver|length|
+-----------+------+
|Christopher|    11|
|  Christine|     9|
|  Catherine|     9|
|  Katherine|     9|
|   Michelle|     8|
|   Samantha|     8|
|   Kimberly|     8|
|   Margaret|     8|
|   Nicholas|     8|
|   Jonathan|     8|
|    Deborah|     7|
|    Dorothy|     7|
|    Melissa|     7|
|    William|     7|
|    Charles|     7|
|    Matthew|     7|
|    Michael|     7|
|    Stephen|     7|
|    Timothy|     7|
|    Richard|     7|
+-----------+------+
only showing top 20 rows



                                                                                