In [1]:
import os

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window

In [2]:
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"

In [25]:
# DEV Constants
GEO_DIR = "/user/solovyovyu/geo.csv"
EVENTS_DIR = "/user/solovyovyu/data/geo/events"
OUT_PATH = "/user/solovyovyu/analytics"

# PROD Constants
# EVENTS_DIR = "/user/master/data/geo/events"

In [4]:
spark = SparkSession.builder.master("yarn").appName("Mart User").getOrCreate()

25/01/24 13:45:13 WARN Utils: Your hostname, fhm2ndk4p3f59ulg8jqk resolves to a loopback address: 127.0.1.1; using 172.16.0.39 instead (on interface eth0)
25/01/24 13:45:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/24 13:45:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/01/24 13:45:16 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [5]:
geo_df = spark.read.options(delimiter=";", header=True).csv(GEO_DIR) \
    .withColumn("lat", F.regexp_replace("lat", ",", ".").cast(DoubleType())) \
    .withColumn("lng", F.regexp_replace("lng", ",", ".").cast(DoubleType())) \
    .withColumnRenamed("lat", "geo_lat") \
    .withColumnRenamed("lng", "geo_lon")

25/01/24 13:45:54 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 2 for reason Container from a bad node: container_1729228596238_15859_01_000003 on host: rc1a-dataproc-d-jysa8rey4rqmgffj.mdb.yandexcloud.net. Exit status: 137. Diagnostics: [2025-01-24 13:45:51.965]Container killed on request. Exit code is 137
[2025-01-24 13:45:51.965]Container exited with a non-zero exit code 137. 
[2025-01-24 13:45:51.965]Killed by external signal
.
                                                                                

In [7]:
geo_df.show()

+---+----------+--------+--------+
| id|      city| geo_lat| geo_lon|
+---+----------+--------+--------+
|  1|    Sydney| -33.865|151.2094|
|  2| Melbourne|-37.8136|144.9631|
|  3|  Brisbane|-27.4678|153.0281|
|  4|     Perth|-31.9522|115.8589|
|  5|  Adelaide|-34.9289|138.6011|
|  6|Gold Coast|-28.0167|   153.4|
|  7|Cranbourne|-38.0996|145.2834|
|  8|  Canberra|-35.2931|149.1269|
|  9| Newcastle|-32.9167|  151.75|
| 10|Wollongong|-34.4331|150.8831|
| 11|   Geelong|  -38.15|  144.35|
| 12|    Hobart|-42.8806| 147.325|
| 13|Townsville|-19.2564|146.8183|
| 14|   Ipswich|-27.6167|152.7667|
| 15|    Cairns|-16.9303|145.7703|
| 16| Toowoomba|-27.5667|  151.95|
| 17|    Darwin|-12.4381|130.8411|
| 18|  Ballarat|  -37.55|  143.85|
| 19|   Bendigo|  -36.75|144.2667|
| 20|Launceston|-41.4419| 147.145|
+---+----------+--------+--------+
only showing top 20 rows



In [6]:
events_df = spark.read.parquet(EVENTS_DIR)
# events_df.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

In [7]:
def get_distance(lat1, lon1, lat2, lon2):
    R = 6371
    d_lat = F.radians(lat2 - lat1)
    d_lon = F.radians(lon2 - lon1)

    a = F.sin(d_lat / 2) * F.sin(d_lat / 2) + F.cos(F.radians(lat1)) * F.cos(F.radians(lat2)) * F.sin(d_lon / 2) * F.sin(d_lon / 2)
    c = 2 * F.atan2(F.sqrt(a), F.sqrt(1 - a))
    return R * c

get_distance_udf = F.udf(get_distance, DoubleType())

25/01/24 13:46:04 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 3 for reason Container from a bad node: container_1729228596238_15859_01_000004 on host: rc1a-dataproc-d-5w4oxa2s5b8foehs.mdb.yandexcloud.net. Exit status: 137. Diagnostics: [2025-01-24 13:46:01.665]Container killed on request. Exit code is 137
[2025-01-24 13:46:01.665]Container exited with a non-zero exit code 137. 
[2025-01-24 13:46:01.666]Killed by external signal
.


In [14]:
 message_df = events_df \
    .where("event_type == 'message'")\
    .select(
        "event.message_id",
        F.col("event.message_from").alias("user_id"),
        "event.message_ts",
        "lat",
        "lon"
    )

In [16]:
message_with_distance = message_df.join(geo_df, how="cross") \
    .withColumn("distance", get_distance(F.col("lat"), F.col("lon"), F.col("geo_lat"), F.col("geo_lon"))) \
    .select(
        "user_id",
        'message_id',
        'distance',
        "message_ts",
        'city'
    )

In [18]:
window = Window.partitionBy("message_id", "user_id").orderBy("distance")

message_with_city = message_with_distance \
    .withColumn("rank", F.row_number().over(window))\
    .filter(F.col("rank") == 1) \
    .select(
        "message_id",
        "user_id",
        "message_ts",
        "city"
    )

In [20]:
window_spec = Window.partitionBy("user_id", "city").orderBy("message_ts")

home_city_df = message_with_city \
    .withColumn("prev_city", F.lag("city").over(window_spec)) \
    .withColumn("is_new_city", (F.col("city") != F.col("prev_city")).cast("int")) \
    .withColumn("group_id", F.sum("is_new_city").over(window_spec)) \
    .withColumn("stay_duration", F.datediff(F.lead("message_ts").over(window_spec), F.col("message_ts"))) \
    .filter(F.col("stay_duration") >= 27) \
    .groupBy("user_id", "city").agg(
        F.max("stay_duration").alias("total_stay")
    ) \
    .orderBy("total_stay", ascending=False) \
    .groupBy("user_id").agg(
        F.first("city").alias("home_city")
    )

In [23]:
mart_user = message_with_city \
    .groupBy("user_id").agg(
        F.last("city").alias("act_city")
    ).join(home_city_df, on="user_id", how="left")

In [28]:
mart_user.write.mode("overwrite").format("parquet").save(f"{OUT_PATH}/mart_users")

                                                                                