### Описание задачи
Коллеги из другого проекта по просьбе вашей команды начали вычислять координаты событий (сообщений, подписок, реакций, регистраций), которые совершили пользователи соцсети. Значения координат будут появляться в таблице событий. Пока определяется геопозиция только исходящих сообщений, но уже сейчас можно начать разрабатывать новый функционал. 
В продукт планируют внедрить систему рекомендации друзей. Приложение будет предлагать пользователю написать человеку, если пользователь и адресат:

- состоят в одном канале,
- раньше никогда не переписывались,
- находятся не дальше 1 км друг от друга.

При этом команда хочет лучше изучить аудиторию соцсети, чтобы в будущем запустить монетизацию. Для этого было решено провести геоаналитику:

- Выяснить, где находится большинство пользователей по количеству сообщений, лайков и подписок из одной точки.
- Посмотреть, в какой точке Австралии регистрируется больше всего новых пользователей.
- Определить, как часто пользователи путешествуют и какие города выбирают.

Благодаря такой аналитике в соцсеть можно будет вставить рекламу: приложение сможет учитывать местонахождение пользователя и предлагать тому подходящие услуги компаний-партнёров. 

In [3]:
from datetime import datetime, timedelta
from typing  import List
import pyspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import os 
import sys



2022-06-20


In [2]:
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

In [None]:
"""
    Перекладываем данные по событиям из слоя сырых данных в слой ODS 
    с дополнительным партиционированием по типу события:
    - message
    - reaction
    - subscription
    - activity
"""
spark = SparkSession.builder \
                    .master("yarn") \
                    .appName("EventsPartitioningJob_all_dates") \
                    .config("spark.executor.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .getOrCreate()

"""
    считали слой сырых данных
"""
events = spark.read.parquet("/user/master/data/geo/events") 

"""
    переложили в ODS с партиционирование по дате и типу события
"""
my_events = events.write.option("header",True) \
        .partitionBy("date", "event_type") \
        .mode("overwrite") \
        .parquet("/user/yurgen001/data/geo/events") 

events.show(10)

24/05/28 11:17:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
24/05/28 11:17:21 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

+--------------------+----------+-------------------+------------------+----------+
|               event|event_type|                lat|               lon|      date|
+--------------------+----------+-------------------+------------------+----------+
|{NULL, NULL, 2022...|  reaction|-26.728842867329007|152.90596937369952|2022-05-30|
|{NULL, NULL, 2022...|  reaction| -31.30616755408133|116.38841832283045|2022-05-30|
|{NULL, NULL, 2022...|  reaction|-22.568566129738795|150.53405510107592|2022-05-30|
|{NULL, NULL, 2022...|  reaction|-12.433534102119754|131.35071038228722|2022-05-30|
|{NULL, NULL, 2022...|  reaction|-12.377345240540057|131.51896908518478|2022-05-30|
|{NULL, NULL, 2022...|  reaction|-26.967101428948645|153.78804052958333|2022-05-30|
|{NULL, NULL, 2022...|  reaction|-11.685460691926384| 130.8833764307162|2022-05-30|
|{NULL, NULL, 2022...|  reaction| -26.74671387818525|152.08417038964188|2022-05-30|
|{NULL, NULL, 2022...|  reaction| -26.94158521928808|152.92817870766552|2022

In [None]:
events.printSchema()
# геопозиция только для исходящих сообщений

root
 |-- event: struct (nullable = true)
 |    |-- admins: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- channel_id: long (nullable = true)
 |    |-- datetime: string (nullable = true)
 |    |-- media: struct (nullable = true)
 |    |    |-- media_type: string (nullable = true)
 |    |    |-- src: string (nullable = true)
 |    |-- message: string (nullable = true)
 |    |-- message_channel_to: long (nullable = true)
 |    |-- message_from: long (nullable = true)
 |    |-- message_group: long (nullable = true)
 |    |-- message_id: long (nullable = true)
 |    |-- message_to: long (nullable = true)
 |    |-- message_ts: string (nullable = true)
 |    |-- reaction_from: string (nullable = true)
 |    |-- reaction_type: string (nullable = true)
 |    |-- subscription_channel: long (nullable = true)
 |    |-- subscription_user: string (nullable = true)
 |    |-- tags: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |   

### Шаг 2. Создать витрину в разрезе пользователей

Определите, в каком городе было совершено событие. С этим вам поможет список городов из файла geo.csv. В нём указаны координаты центра города. 
Найдите расстояние от координаты отправленного сообщения до центра города. Событие относится к тому городу, расстояние до которого наименьшее.
В этом случае расстояние вычисляется по формуле:
   
d=2rarcsin⁡⁡(sin⁡2(φ2−φ12)+cos⁡⁡(φ1)cos⁡⁡(φ2)sin⁡2(λ2−λ12)).d=2rarcsin⁡(sin2(2φ2​−φ1​​)+cos⁡(φ1​)cos⁡​(φ2​)sin2(2λ2​−λ1​​)​).

где:
φ1φ1​ — широта первой точки;
φ2φ2​ — широта второй точки;
λ1λ1​ — долгота первой точки;
λ2λ2​ — долгота второй точки;
rr — радиус Земли, примерно равный 6371 км.

Это формула для расчёта длины пути между двумя точками на сфере.
Рекомендуем вам сначала самим попробовать перевести её в PySpark. Если не получится, воспользуйтесь подсказкой.

Когда вы вычислите геопозицию каждого отправленного сообщения, создайте витрину с тремя полями: 
    
- user_id — идентификатор пользователя.
- act_city — актуальный адрес. Это город, из которого было отправлено последнее сообщение.
- home_city — домашний адрес. Это последний город, в котором пользователь был дольше 27 дней.

Выясните, сколько пользователь путешествует. Добавьте в витрину два поля:
    
- travel_count — количество посещённых городов. Если пользователь побывал в каком-то городе повторно, то это считается за отдельное посещение.
- travel_array — список городов в порядке посещения.

Добавьте в витрину последний атрибут — местное время (local_time) события (сообщения или других событий, если вы их разметите на основе сообщений). Местное время события — время последнего события пользователя, о котором у нас есть данные с учётом таймзоны геопозициии этого события. Данные, которые вам при этом пригодятся:
    
TIME_UTC — время в таблице событий. Указано в UTC+0.
timezone — актуальный адрес. Атрибуты содержатся в виде Australia/Sydney.

In [None]:
"""
    Отладка формулы расчета расстояния между двумя точками  по их широте и долготе
"""
import math
 
earth_radius = 6371 # в километрах
lat_1 = -27.07407468786464
lon_1 = 153.2731738967999
lat_2 = -42.8806
lon_2 = 147.325

PI=3.14159265
distance = 2 * earth_radius * math.asin(math.sqrt(math.pow(math.sin((math.radians(lat_1 - lat_2)) / 2), 2) + math.cos(math.radians(lat_1)) \
    * math.cos(math.radians(lat_2)) * math.pow(math.sin(math.radians(lon_1 - lon_2) / 2), 2))) # тоже в километрах

# distance = 2 * earth_radius * math.asin(math.sqrt(math.pow(math.sin((lat_1 - lat_2) / 2), 2) + math.cos(lat_1) \
#     * math.cos(lat_2) * math.pow(math.sin((lon_1 - lon_2) / 2), 2))) # тоже в километрах
print(distance)
# messages_with_distance = messages.withColumn("distance", (F.sqrt((F.sin(F.col('message_lat') - F.col('city_lat')) / F.lit(2)) + F.cos(F.col('message_lat')) * F.cos(F.col('city_lat')) * F.pow(F.sin(F.col('message_lon') - F.col('city_lon')) / F.lit(2), F.lit(2))))
# )



1837.9808618904044


In [None]:
from datetime import datetime, timedelta
from typing  import List
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import os 
import sys

os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

spark = SparkSession.builder \
                    .master("yarn") \
                    .appName("datamartByUsersJob") \
                    .config("spark.executor.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .getOrCreate()


"""
    формирование списка партиций по дате на заданную глубину
"""
def input_paths(date: str, depth: int, base_path: str) -> List:
    dt = datetime.strptime(date, '%Y-%m-%d').date()
    path_list = [base_path + '/date=' + (dt - timedelta(days=i)).strftime('%Y-%m-%d')
                 for i in range(depth)
                ]
    return path_list




def datamart_by_users(base_path:str, date: str, depth: int, spark: SparkSession) -> DataFrame:

    DAYS_COUNT = 27 # время непрерывного нахождения для определения домашнего города
    EARTH_RADIUS = 6371

    """
        Формируем и вычитываем список партиций на заданную глубину с заданной даты
    """
    events_path = input_paths(date, depth, base_path)
#     print(events_path)
    events = spark.read.option('basePath', base_path).parquet(*events_path)
#     events.printSchema()

    """
        считываем файл geo.csv с координатами городов
        преобразуем долготу и широту из строкового типа в числовой
    """
    schema_geo_csv = StructType([ 
        StructField("id",IntegerType(),True), 
        StructField("city",StringType(),True), 
        StructField("lat",StringType(),True), 
        StructField("lng",StringType(),True)
    ])    
    
    geo_city = spark.read.options(delimiter=";", header=True) \
                         .schema(schema_geo_csv) \
                         .csv("/user/yurgen001/data/snapshots/geo_city") \
                         .withColumn("lat", F.regexp_replace("lat", ",", ".").cast(DoubleType())) \
                         .withColumn("lng", F.regexp_replace("lng", ",", ".").cast(DoubleType()))
#     geo_city.printSchema()
#     geo_city.show(10)

       
    """
        выделяем id пользователя, как id отправителя сообщения (координаты внедрены только для событий типа исходящих сообщений)
        джойним полученное со списком городов как декартово произведение все со всеми,
        чтобы в дальнейшем посчитать дистанцию до всех городов и выбрать минимальную
    """
    messages = events.where("event_type = 'message' AND event.message_ts IS NOT NULL") \
                     .selectExpr("event.message_from AS user_id",
                                 "event.message_ts AS message_dt",
                                 "lat AS message_lat",
                                 "lon AS message_lon") \
                     .crossJoin(geo_city.withColumnRenamed("lat", "city_lat").withColumnRenamed("lng", "city_lon")) 

#     messages.show()
  

    """
        добавляем поле дистанции до города, таким образом у нас есть заготовка с дистанциями до всех городов списка    
    """
    messages_with_distance = messages.withColumn("distance", 2 * EARTH_RADIUS * F.asin(F.sqrt(F.pow(F.sin((F.radians('message_lat') - F.radians('city_lat')) / F.lit(2)), F.lit(2)) \
                    + F.cos(F.radians('message_lat')) * F.cos(F.radians('city_lat')) * F.pow(F.sin((F.radians('message_lon') - F.radians('city_lon')) / F.lit(2)), F.lit(2))))
                                                )
    
#     messages_with_distance.show()
    # distance_rank_window = Window().partitionBy("user_id", "message_dt").orderBy("distance")
    # messages_with_city = messages_with_distance.select("user_id", "message_dt", "city", "distance") \
    #                                            .withColumn("distance_rank", F.row_number().over(distance_rank_window)) \
    #                                            .where("distance_rank = 1") \
    #                                            .selectExpr("user_id",
    #                                                        "message_dt",
    #                                                        "city")

    """
        Найдем город отправки сообщения (это город, дистанция до которого минимальна)
     
    """
    distance_rank_window = Window().partitionBy("user_id", "message_dt").orderBy("distance")
    messages_with_city = messages_with_distance.select("user_id",
                                                       "message_dt",
                                                       "message_lat",
                                                       "message_lon",
                                                       "city",
                                                       "distance") \
                                               .withColumn("distance_rank", F.row_number().over(distance_rank_window)) \
                                               .where("distance_rank = 1") \
                                               .select("user_id",
                                                       "message_dt",
                                                       "message_lat",
                                                       "message_lon",
                                                       "city")


    messages_with_city.cache()
#     messages_with_city.show()

    """
        Находим актуальный адрес
        город, из которого было отправлено последнее сообщение
    """
    last_dt_window = Window().partitionBy("user_id").orderBy(F.desc("message_dt"))
    # user_act_city = messages_with_city.withColumn("dt_rank", F.row_number().over(last_dt_window)) \
    #                                   .where("dt_rank = 1") \
    #                                   .selectExpr("user_id", "city AS act_city", \
    #                                               "CONCAT('Australia/', city) AS time_zone", \
    #                                               "FROM_UTC_TIMESTAMP(message_dt, CONCAT('Australia/', 'Sydney')) AS local_time"                            
    #                                              ) 

    user_act_location = messages_with_city.withColumn("dt_rank", F.row_number().over(last_dt_window)) \
                                          .where("dt_rank = 1") \
                                          .selectExpr("user_id",
                                                      "message_lat AS act_lat",
                                                      "message_lon AS act_lon",
                                                      "city AS act_city", 
                                                      "CONCAT('Australia/', city) AS time_zone",
                                                      "FROM_UTC_TIMESTAMP(message_dt, CONCAT('Australia/', 'Sydney')) AS local_time"                            
                                                      ) 


#     user_act_location.show(10, False)
    """
        делаем заготовку для поиска домашнего города и списка посещенных городов
        для этого нам надо знать сколько дней пользователь провел в каждом городе последовательно
        делаем два столбца row_number один сгруппирован только по пользователю а второй по пользователю и по городу
        пока пользователь в городе разность между столбцами постоянна, при смене города она меняется и служит номером визита в город 
        нумерация начинается с нуля (нашел решение на стековерфлоу)
    """
    seq_num_window = Window().partitionBy("user_id").orderBy("message_dt")
    seq_num_city_window = Window().partitionBy("user_id", "city").orderBy("message_dt")
    user_city_visits = messages_with_city.withColumn("seq_num", F.row_number().over(seq_num_window)) \
                                         .withColumn("seq_num_city", F.row_number().over(seq_num_city_window)) \
                                         .withColumn("visit_num", F.col("seq_num") - F.col("seq_num_city"))

#     user_city_visits.show(40, False)
    """
        считаем длительность каждого визита пользователя в город в днях, метки начала и конца визита по 
        минимальной и максимальной метке отправки сообщения 
    """
    user_city_visits = user_city_visits.groupBy("user_id", "city", "visit_num") \
                                       .agg(F.min("message_dt").alias("start_visit_dt"), \
                                            F.max("message_dt").alias("end_visit_dt"), \
                                           ) \
                                       .withColumn("visit_day_count", F.datediff(F.to_date("end_visit_dt"), F.to_date("start_visit_dt")))
#     user_city_visits.show(40, False)

    """
        в проекте эти вычисления нужны для всех трех витрин и будут выделены в отдельный модуль
    """

    """
        Находим последний город, в котором пользователь провел не менее 27 дней (домашний адрес)
    """
    home_city_window = Window().partitionBy("user_id").orderBy(F.desc("start_visit_dt"))
    user_home_city = user_city_visits.where(f"visit_day_count >= {DAYS_COUNT}") \
                                     .withColumn("home_city_rank", F.row_number().over(home_city_window)) \
                                     .where("home_city_rank = 1") \
                                     .selectExpr("user_id",
                                                 "city AS home_city")             
#     user_home_city.show()
    """
        Считаем для каждого пользователя число посещенных городов и формируем список городов в порядке посещения
    """
    user_travel = user_city_visits.orderBy("start_visit_dt") \
                                  .groupBy("user_id") \
                                  .agg(F.count("city").alias("travel_count"), \
                                       F.collect_list("city").alias("travel_array")
                                      )                     
#     user_travel.show(40, False)
    """
        Собираем витрину по пользователю
    """
    user_datamart = user_act_location.join(user_travel, "user_id", "left") \
                                 .join(user_home_city, "user_id", "left") \
                                 .select("user_id",
                                         "act_city",
                                         "home_city",
                                         "travel_count",
                                         "travel_array",
                                         "local_time")
    user_datamart.show()

    return user_datamart
    
    # user_travel_array = user_city_visits.orderBy("start_visit_dt") \
    #                                     .groupBy("user_id") \
    #                                     .agg(F.collect_list("city").alias("travel_array"))

if __name__ == '__main__':
    datamart_by_users("/user/yurgen001/data/geo/events", '2022-05-31', 60, spark)#.repartition(1) \
        #.write.mode('overwrite').parquet('/user/yurgen001/data/analytics/dm_by_user_d60')

24/05/23 10:38:48 WARN CacheManager: Asked to cache already cached data.
                                                                                

+-------+----------+---------+------------+--------------------+--------------------+
|user_id|  act_city|home_city|travel_count|        travel_array|          local_time|
+-------+----------+---------+------------+--------------------+--------------------+
|     43|Cranbourne|     NULL|           1|        [Cranbourne]|2021-05-11 14:47:...|
|     57|    Darwin|     NULL|           1|            [Darwin]|2021-05-09 18:04:...|
|    190|     Perth|     NULL|           1|             [Perth]|2021-04-26 10:06:...|
|    198|    Mackay|     NULL|           3|[Mackay, Bendigo,...|2021-05-31 15:26:...|
|    243|Townsville|     NULL|           1|        [Townsville]|2021-06-02 02:50:...|
|    370|  Adelaide|     NULL|           1|          [Adelaide]|2021-04-30 23:04:...|
|    442|    Mackay|     NULL|           7|[Mackay, Geelong,...| 2021-05-06 17:20:21|
|    442|    Mackay|     NULL|           7|[Mackay, Geelong,...| 2021-05-06 17:20:21|
|    487|    Cairns|     NULL|           1|           

### Шаг 3. Создать витрину в разрезе зон
Вы создадите геослой — найдёте распределение атрибутов, связанных с событиями, по географическим зонам (городам). Если проанализировать этот слой, то можно понять поведение пользователей по различным регионам. 
Итак, вам нужно посчитать количество событий в конкретном городе за неделю и месяц. Значит, витрина будет содержать следующие поля:

- month — месяц расчёта;
- week — неделя расчёта;
- zone_id — идентификатор зоны (города);
- week_message — количество сообщений за неделю;
- week_reaction — количество реакций за неделю;
- week_subscription — количество подписок за неделю;
- week_user — количество регистраций за неделю;
- month_message — количество сообщений за месяц;
- month_reaction — количество реакций за месяц;
- month_subscription — количество подписок за месяц;
- month_user — количество регистраций за месяц.

В этой витрине вы учитываете не только отправленные сообщения, но и другие действия — подписки, реакции, регистрации (рассчитываются по первым сообщениям). Пока присвойте таким событиям координаты последнего отправленного сообщения конкретного пользователя.

In [None]:
from datetime import datetime, timedelta
from typing  import List
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import os 
import sys

os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

spark = SparkSession.builder \
                    .master("yarn") \
                    .appName("datamartByZoneJob") \
                    .config("spark.executor.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .getOrCreate()


def input_paths(date: str, depth: int, base_path: str) -> List:
    dt = datetime.strptime(date, '%Y-%m-%d').date()
    path_list = [base_path + '/date=' + (dt - timedelta(days=i)).strftime('%Y-%m-%d')
                 for i in range(depth)
                ]
    return path_list


def datamart_by_zone(base_path:str, date: str, depth: int, spark: SparkSession) -> DataFrame:

    EARTH_RADIUS = 6371

    events_path = input_paths(date, depth, base_path)
#     print(events_path)
    events = spark.read.option('basePath', base_path).parquet(*events_path)
#     events.printSchema()
    schema_geo_csv = StructType([ 
        StructField("id",IntegerType(),True), 
        StructField("city",StringType(),True), 
        StructField("lat",StringType(),True), 
        StructField("lng",StringType(),True)
    ])    
    
    geo_city = spark.read.options(delimiter=";", header=True) \
                         .schema(schema_geo_csv) \
                         .csv("/user/yurgen001/data/snapshots/geo_city") \
                         .withColumn("lat", F.regexp_replace("lat", ",", ".").cast(DoubleType())) \
                         .withColumn("lng", F.regexp_replace("lng", ",", ".").cast(DoubleType()))
#     geo_city.printSchema()
#     geo_city.show(10)

  
    messages = events.where("event_type = 'message' AND event.message_ts IS NOT NULL") \
                     .selectExpr("event.message_from AS user_id", "event.message_ts AS message_dt", "lat AS message_lat", "lon AS message_lon") \
                     .crossJoin(geo_city.withColumnRenamed("lat", "city_lat") \
                                        .withColumnRenamed("lng", "city_lon")) 

#     messages.show()
  
    messages_with_distance = messages.withColumn("distance", 2 * EARTH_RADIUS * F.asin(F.sqrt(F.pow(F.sin((F.radians('message_lat') - F.radians('city_lat')) / F.lit(2)), F.lit(2)) \
                    + F.cos(F.radians('message_lat')) * F.cos(F.radians('city_lat')) * F.pow(F.sin((F.radians('message_lon') - F.radians('city_lon')) / F.lit(2)), F.lit(2))))
                                                )
    
#     messages_with_distance.show()
    # distance_rank_window = Window().partitionBy("user_id", "message_dt").orderBy("distance")
    # messages_with_city = messages_with_distance.select("user_id", "message_dt", "city", "distance") \
    #                                            .withColumn("distance_rank", F.row_number().over(distance_rank_window)) \
    #                                            .where("distance_rank = 1") \
    #                                            .selectExpr("user_id",
    #                                                        "message_dt",
    #                                                        "city")

    distance_rank_window = Window().partitionBy("user_id", "message_dt").orderBy("distance")
    messages_with_city = messages_with_distance.select("user_id",
                                                       "message_dt",
                                                       "message_lat",
                                                       "message_lon",
                                                       "city",
                                                       "distance") \
                                               .withColumn("distance_rank", F.row_number().over(distance_rank_window)) \
                                               .where("distance_rank = 1") \
                                               .select("user_id",
                                                       "message_dt",
                                                       "message_lat",
                                                       "message_lon",
                                                       "city")


    messages_with_city.cache()
#     messages_with_city.show()


    
    last_dt_window = Window().partitionBy("user_id").orderBy(F.desc("message_dt"))
    # user_act_city = messages_with_city.withColumn("dt_rank", F.row_number().over(last_dt_window)) \
    #                                   .where("dt_rank = 1") \
    #                                   .selectExpr("user_id", "city AS act_city")


    user_act_location = messages_with_city.withColumn("dt_rank", F.row_number().over(last_dt_window)) \
                                          .where("dt_rank = 1") \
                                          .selectExpr("user_id",
                                                      "message_lat AS act_lat",
                                                      "message_lon AS act_lon",
                                                      "city AS act_city", 
                                                      "CONCAT('Australia/', city) AS time_zone",
                                                      "FROM_UTC_TIMESTAMP(message_dt, CONCAT('Australia/', 'Sydney')) AS local_time"                            
                                                      ) 

#     user_act_city.show()

    """
        _______________________________________________________________________________________
        до этого места вычиления те же, что и в первой витрине
    """
    """
        Формируем данные по сообщениям  из уже предобработанного датасета 
    """
    out_messages = messages_with_city.selectExpr("TO_DATE(message_dt) AS event_date",
                                                 "'message' AS event_type",
                                                 "city AS zone_id")
    
#     out_messages.show()

    """
        Формируем данные по реакциям из основной таблицы событий
        зону берем из текущего положения пользователя
    """
    out_reactions = events.where("event_type = 'reaction'") \
                      .selectExpr("CAST(event.reaction_from AS LONG) AS user_id",
                                  "TO_DATE(event.datetime) AS event_date") \
                      .join(user_act_location, "user_id", "inner") \
                      .selectExpr("event_date",
                                  "'reaction' AS event_type",
                                  "act_city AS zone_id")
    
#     out_reactions.printSchema()
#     out_reactions.show()
    
    """
        Формируем данные по подпискам из основной таблицы событий
        зону берем из текущего положения пользователя
    """

    out_subscriptions = events.where("event_type = 'subscription'") \
                              .selectExpr("CAST(event.user AS LONG) AS user_id",
                                          "TO_DATE(event.datetime) AS event_date") \
                              .join(user_act_location, "user_id", "inner") \
                              .selectExpr("event_date",
                                          "'subscription' AS event_type",
                                          "act_city AS zone_id")
    
#     out_subscriptions.printSchema()
#     out_subscriptions.show()
    """
        Собираем таблицу первых сообщений пользователя (в каком городе оно отпаравлено) 
        эти координаты мы присвоим событию регистрации поскольку геопозиционирование 
        у нас пока реализовано только для отправленных сообщений
    """
    first_dt_window = Window().partitionBy("user_id").orderBy("message_dt")
    user_first_message = messages_with_city.withColumn("dt_rank", F.row_number().over(first_dt_window)) \
                                          .where("dt_rank = 1") \
                                          .selectExpr("user_id",
                                                      "message_dt",
                                                      "city"
                                                      ) 
    """
        Формируем данные по регистрациям из таблицы первых сообщений пользователя
    """

    out_registrations = user_first_message.selectExpr("TO_DATE(message_dt) AS event_date",
                                                      "'registration' AS event_type",
                                                      "city AS zone_id")

#     out_registrations.printSchema()
#     out_registrations.show()
    """
        Объединяем все типы событий в единую таблицу и добавляем неделю и месяц события
    """
    all_events =  out_messages.unionByName(out_reactions) \
                              .unionByName(out_subscriptions) \
                              .unionByName(out_registrations) \
                              .withColumn("week", F.date_trunc("week", "event_date")) \
                              .withColumn("month", F.date_trunc("month", "event_date"))
#     all_events.show()

    """
        Собираем витрину по неделям
    """
    zone_datamart_by_week = all_events.groupBy("zone_id", "month", "week") \
                                      .pivot("event_type", ["message", "reaction", "subscription", "registration"]) \
                                      .agg(F.count("event_date")) \
                                      .withColumnRenamed("message", "week_message") \
                                      .withColumnRenamed("reaction", "week_reaction") \
                                      .withColumnRenamed("subscription", "week_subscription") \
                                      .withColumnRenamed("registration", "week_user")
                                      
    
#     zone_datamart_by_week.show(100)
    """
        Собираем витрину по месяцам (минус одна группировка по неделям)
    """
    zone_datamart_by_month = all_events.groupBy("zone_id", "month") \
                                       .pivot("event_type", ["message", "reaction", "subscription", "registration"]) \
                                       .agg(F.count("event_date")) \
                                       .withColumnRenamed("message", "month_message") \
                                       .withColumnRenamed("reaction", "month_reaction") \
                                       .withColumnRenamed("subscription", "month_subscription") \
                                       .withColumnRenamed("registration", "month_user")
                                      
    
#     zone_datamart_by_month.show(100)
    """
        Объединяем данные в выходную витрину
    """
    zone_datamart = zone_datamart_by_week.join(zone_datamart_by_month, ["zone_id", "month"], "inner") \
                                         .select("month",
                                                 "week",
                                                 "zone_id",
                                                 "week_message",
                                                 "week_reaction",
                                                 "week_subscription",
                                                 "week_user",
                                                 "month_message",
                                                 "month_reaction",
                                                 "month_subscription",
                                                 "month_user",
                                                ) \
                                         .orderBy("zone_id", "month", "week") \
                                         .na.fill(value=0)
#     zone_datamart.show(40)
    return zone_datamart
    
if __name__ == '__main__':
    datamart_by_zone("/user/yurgen001/data/geo/events", '2022-05-31', 60, spark)#.repartition(1) \
        #.write.mode('overwrite').parquet('/user/yurgen001/data/analytics/dm_by_zone_d60')

24/05/28 11:45:22 WARN CacheManager: Asked to cache already cached data.        
                                                                                

+-------------------+-------------------+--------+------------+-------------+-----------------+---------+-------------+--------------+------------------+----------+
|              month|               week| zone_id|week_message|week_reaction|week_subscription|week_user|month_message|month_reaction|month_subscription|month_user|
+-------------------+-------------------+--------+------------+-------------+-----------------+---------+-------------+--------------+------------------+----------+
|2021-04-01 00:00:00|2021-03-29 00:00:00|Adelaide|         550|            0|                0|      298|         3445|             0|                 0|       949|
|2021-04-01 00:00:00|2021-04-05 00:00:00|Adelaide|        1065|            0|                0|      272|         3445|             0|                 0|       949|
|2021-04-01 00:00:00|2021-04-12 00:00:00|Adelaide|         852|            0|                0|      180|         3445|             0|                 0|       949|
|2021-04-0

### Шаг 4. Построить витрину для рекомендации друзей
Напомним, как будет работать рекомендация друзей: если пользователи подписаны на один канал, ранее никогда не переписывались и расстояние между ними не превышает 1 км, то им обоим будет предложено добавить другого в друзья. Образовывается парный атрибут, который обязан быть уникальным: порядок упоминания не должен создавать дубли пар.
Витрина будет содержать следующие атрибуты:

- user_left — первый пользователь;
- user_right — второй пользователь;
- processed_dttm — дата расчёта витрины;
- zone_id — идентификатор зоны (города);
- local_time — локальное время.

In [None]:
from datetime import datetime, timedelta
from typing  import List
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import os 
import sys

os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

spark = SparkSession.builder \
                    .master("yarn") \
                    .appName("datamartFriendsRecomendationJob") \
                    .config("spark.executor.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .getOrCreate()


def input_paths(date: str, depth: int, base_path: str) -> List:
    dt = datetime.strptime(date, '%Y-%m-%d').date()
    path_list = [base_path + '/date=' + (dt - timedelta(days=i)).strftime('%Y-%m-%d')
                 for i in range(depth)
                ]
    return path_list


def datamart_friends_recomendation(base_path:str, date: str, depth: int, spark: SparkSession) -> DataFrame:

    EARTH_RADIUS = 6371

    events_path = input_paths(date, depth, base_path)
#     print(events_path)
    events = spark.read.option('basePath', base_path).parquet(*events_path)
#     events.printSchema()

    schema_geo_csv = StructType([ 
        StructField("id",IntegerType(),True), 
        StructField("city",StringType(),True), 
        StructField("lat",StringType(),True), 
        StructField("lng",StringType(),True)
    ])    
    
    geo_city = spark.read.options(delimiter=";", header=True) \
                         .schema(schema_geo_csv) \
                         .csv("/user/yurgen001/data/snapshots/geo_city") \
                         .withColumn("lat", F.regexp_replace("lat", ",", ".").cast(DoubleType())) \
                         .withColumn("lng", F.regexp_replace("lng", ",", ".").cast(DoubleType()))
#     geo_city.printSchema()
#     geo_city.show(10)

       
    
    messages = events.where("event_type = 'message' AND event.message_ts IS NOT NULL") \
                     .selectExpr("event.message_from AS user_id",
                                 "event.message_ts AS message_dt",
                                 "lat AS message_lat",
                                 "lon AS message_lon") \
                     .crossJoin(geo_city.withColumnRenamed("lat", "city_lat") \
                                        .withColumnRenamed("lng", "city_lon")) 

#     messages.show()
  

    
    messages_with_distance = messages.withColumn("distance", 2 * EARTH_RADIUS * F.asin(F.sqrt(F.pow(F.sin((F.radians('message_lat') - F.radians('city_lat')) / F.lit(2)), F.lit(2)) \
                    + F.cos(F.radians('message_lat')) * F.cos(F.radians('city_lat')) * F.pow(F.sin((F.radians('message_lon') - F.radians('city_lon')) / F.lit(2)), F.lit(2))))
                                                )
    
#     messages_with_distance.show()
    distance_rank_window = Window().partitionBy("user_id", "message_dt").orderBy("distance")
    messages_with_city = messages_with_distance.select("user_id",
                                                       "message_dt",
                                                       "message_lat",
                                                       "message_lon",
                                                       "city",
                                                       "distance") \
                                               .withColumn("distance_rank", F.row_number().over(distance_rank_window)) \
                                               .where("distance_rank = 1") \
                                               .select("user_id",
                                                       "message_dt",
                                                       "message_lat",
                                                       "message_lon",
                                                       "city")
    
    messages_with_city.cache()
#     messages_with_city.show()

    last_dt_window = Window().partitionBy("user_id").orderBy(F.desc("message_dt"))
    user_act_location = messages_with_city.withColumn("dt_rank", F.row_number().over(last_dt_window)) \
                                          .where("dt_rank = 1") \
                                          .selectExpr("user_id",
                                                      "message_lat AS act_lat",
                                                      "message_lon AS act_lon",
                                                      "city AS act_city", 
                                                      "CONCAT('Australia/', city) AS time_zone",
                                                      "FROM_UTC_TIMESTAMP(message_dt, CONCAT('Australia/', 'Sydney')) AS local_time"                            
                                                      ) 

    # user_act_location.show(20, False)
    """
        ___________________________________________________________________________________
        до этого места вычисления те же, что и в первой витрине по пользователям
    """

    """
        Формируем таблицу соседей.
        Джойним таблицу текущего местоположения пользоваетеля с самой собой по условию неравенства F.col("df_left.user_id") < F.col("df_right.user_id")
        Получим недублированнеы пары пользователей, как надо по условию задачи
        при этом id левого пользователя всегда меньше id правого
        Далее сичтаем расстояние между ними в километрах и фильтруем тех, кто расположен ближе одного километра друг от друга
    """
    neighbors = user_act_location.alias("df_left").join(user_act_location.alias("df_right"), F.col("df_left.user_id") < F.col("df_right.user_id"), 'inner') \
        .withColumn("distance", 2 * EARTH_RADIUS * F.expr("asin(sqrt(pow(sin((radians(df_left.act_lat) - radians(df_right.act_lat)) / 2), 2)"
        " + cos(radians(df_left.act_lat)) * cos(radians(df_right.act_lat)) * pow(sin((radians(df_left.act_lon) - radians(df_right.act_lon)) / 2), 2)))")
                   ) \
                                .where("distance < 1") \
                                .selectExpr("df_left.user_id AS left_user",
                                            "df_right.user_id AS right_user",
                                            "CURRENT_TIMESTAMP() AS processed_dttm",
                                            "df_left.act_city AS zone_id",
                                            "GREATEST(df_left.local_time, df_right.local_time) AS local_time"
                                           )

#     neighbors.show()

#     messages_from_left_user = events.where("event_type = 'message'") \
#                                     .selectExpr("event.message_from AS message_from",
#                                                 "event.message_to AS message_to") \
#                                     .join(neighbors.select("left_user"), neighbors["left_user"] == F.col("message_from"), "inner") \
#                                     .selectExpr("left_user",
#                                                 "message_to AS right_user")

# #     messages_from_left_user.show()

#     messages_to_left_user = events.where("event_type = 'message'") \
#                                   .selectExpr("event.message_from AS message_from",
#                                              "event.message_to AS message_to") \
#                                   .join(neighbors.select("left_user"), neighbors["left_user"] == F.col("message_to"), "inner") \
#                                   .selectExpr("left_user",
#                                              "message_from AS right_user")
# #     messages_to_left_user.show()

#     left_user_correspondents = messages_from_left_user.unionByName(messages_to_left_user) \
#                                                       .na.drop()
    
#     left_user_correspondents.show()
    """
        Формируем таблицу корреспондентов из общей таблицы событий таким образом,
        что id левого пользователя всегда меньше id правого пользователя
    """

    correspondents = events.where("event_type = 'message' AND event.message_from IS NOT NULL AND event.message_to IS NOT NULL") \
                           .selectExpr("""
                                          CASE 
                                            WHEN event.message_from < event.message_to THEN event.message_from
                                            ELSE event.message_to  
                                          END AS left_user
                                       """,
                                       """
                                         CASE 
                                            WHEN event.message_from > event.message_to THEN event.message_from
                                            ELSE event.message_to  
                                          END AS right_user
                                       """
                                     ) 

#     correspondents.show()
    """
        Из таблицы соседей выбираем те пары, которые не обменивались сообщениями
    """
    neighbors_without_messages = neighbors.join(correspondents, ["left_user", "right_user"], "leftanti") 
    
    neighbors_without_messages.cache()
#     neighbors_without_messages.show()
    
#     left_user_subscriptions = events.where("event_type = 'subscription'") \
#                              .selectExpr("CAST(event.user AS LONG) AS left_user",
#                                          "event.subscription_channel AS subscription_channel") \
#                              .join(neighbors_without_messages.select("left_user"), ["left_user"], "inner") \
#                              .selectExpr("left_user",
#                                          "subscription_channel")

# #     left_user_subscriptions.show()
    
#     right_user_subscritions = events.where("event_type = 'subscription'") \
#                               .selectExpr("CAST(event.user AS LONG) AS right_user",
#                                           "event.subscription_channel AS subscription_channel") \
#                               .join(neighbors_without_messages.select("right_user"), ["right_user"], "inner") \
#                               .selectExpr("right_user",
#                                           "subscription_channel")
    
# #     right_user_subscritions.show()
    
#     common_subscribers = left_user_subscriptions.join(right_user_subscritions, ["subscription_channel"], "inner") \
#                                          .select("left_user",
#                                                  "right_user")
    
#     common_subscribers.show()

    # friends_recomendation = neighbors_without_messages.join(common_subscribers, ["left_user", "right_user"], "leftsemi") \
    #                                                   .select("left_user",
    #                                                           "right_user",
    #                                                           "processed_dttm",
    #                                                           "zone_id",
    #                                                           "local_time") 

    """
        из таблицы событий выбираем события подписки, группируем по пользователю и 
        формируем для каждого пользователя множество каналов, на которые он подписан
    """
    user_subscriptions = events.where("event_type = 'subscription'") \
                               .selectExpr("CAST(event.user AS LONG) AS user_id",
                                           "event.subscription_channel AS subscription_channel") \
                               .groupBy("user_id") \
                               .agg(F.collect_set("subscription_channel").alias("subscription_set"))
    user_subscriptions.show()

    """
        формируем выходную витрину на основе таблицы соседей, не обменивавшихся сообщениями
        джойним их списки каналов, получваем пересечение списков
        и выбираем те пары корреспондентов, у которыъх пересечение не пустое (есть общие каналы в подписках)
    """

    friends_recomendation = neighbors_without_messages.join(user_subscriptions.alias("l_sub"), F.expr("left_user = l_sub.user_id"), "inner") \
                                                      .join(user_subscriptions.alias("r_sub"), F.expr("right_user = r_sub.user_id"), "inner") \
                                                      .withColumn("common_subscriptions", F.expr("ARRAY_INTERSECT(l_sub.subscription_set, r_sub.subscription_set)")) \
                                                      .where("size(common_subscriptions) > 0") \
                                                      .select("left_user",
                                                              "right_user",
                                                              "processed_dttm",
                                                              "zone_id",
                                                              "local_time") 
    



    friends_recomendation.show()
    
    
if __name__ == '__main__':
    datamart_friends_recomendation("/user/yurgen001/data/geo/events", '2022-05-31', 60, spark)#.repartition(1) \
        #.write.mode('overwrite').parquet('/user/yurgen001/data/analytics/dm_friends_recomendation_d60')


24/05/28 12:16:37 WARN CacheManager: Asked to cache already cached data.
24/05/28 12:16:37 WARN CacheManager: Asked to cache already cached data.
                                                                                

+-------+--------------------+
|user_id|    subscription_set|
+-------+--------------------+
|      0|[247658, 658545, ...|
|      6|[952414, 680730, ...|
|      7|[647264, 594330, ...|
|     19|[794489, 367857, ...|
|     22|[300592, 410010, ...|
|     25|[997444, 378549, ...|
|     26|[832165, 978476, ...|
|     29|[475118, 758368, ...|
|     31|[980487, 978636, ...|
|     32|[584995, 284651, ...|
|     34|[498167, 648586, ...|
|     39|[371530, 971965, ...|
|     43|[590526, 592633, ...|
|     50|[247040, 621916, ...|
|     54|[78190, 200698, 6...|
|     57|[877622, 311946, ...|
|     58|[93479, 538602, 6...|
|     65|[733803, 548496, ...|
|     68|[568545, 770178, ...|
|     71|[952414, 201612, ...|
+-------+--------------------+
only showing top 20 rows



[Stage 164:>                                                        (0 + 1) / 1]

+---------+----------+--------------------+-----------+--------------------+
|left_user|right_user|      processed_dttm|    zone_id|          local_time|
+---------+----------+--------------------+-----------+--------------------+
|     4360|      6664|2024-05-28 11:55:...|   Canberra|2021-04-29 03:12:...|
|     8382|      9349|2024-05-28 11:55:...| Launceston|2021-05-24 01:19:...|
|     6978|     11259|2024-05-28 11:55:...|   Maitland|2021-05-31 06:42:...|
|    10205|     13101|2024-05-28 11:55:...|    Ipswich|2021-04-19 09:32:...|
|     6317|     16521|2024-05-28 11:55:...| Cranbourne|2021-05-08 13:37:...|
|     7137|     16834|2024-05-28 11:55:...|   Maitland|2021-05-10 12:11:...|
|     2837|     18086|2024-05-28 11:55:...|   Maitland|2021-05-01 17:47:...|
|     5950|     18203|2024-05-28 11:55:...| Launceston|2021-04-25 17:26:...|
|    17371|     18358|2024-05-28 11:55:...| Cranbourne|2021-05-22 22:59:...|
|     3610|     18700|2024-05-28 11:55:...|      Perth|2021-04-14 11:02:...|


                                                                                

In [16]:
from datetime import datetime, timedelta
from typing  import List, Tuple
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import os 
import sys

os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

spark = SparkSession.builder \
                    .master("yarn") \
                    .appName("datamartFriendsRecomendationJob") \
                    .config("spark.executor.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .getOrCreate()


def input_paths(date: str, depth: int, base_path: str) -> List:
    dt = datetime.strptime(date, '%Y-%m-%d').date()
    path_list = [base_path + '/date=' + (dt - timedelta(days=i)).strftime('%Y-%m-%d')
                 for i in range(depth)
                ]
    return path_list


def common_load_and_calculation(base_path:str, date: str, depth: int, spark: SparkSession) -> Tuple[DataFrame]:

    EARTH_RADIUS = 6371

    events_path = input_paths(date, depth, base_path)
#     print(events_path)
    events = spark.read.option('basePath', base_path).parquet(*events_path)
#     events.printSchema()

    schema_geo_csv = StructType([ 
        StructField("id",IntegerType(),True), 
        StructField("city",StringType(),True), 
        StructField("lat",StringType(),True), 
        StructField("lng",StringType(),True)
    ])    
    
    geo_city = spark.read.options(delimiter=";", header=True) \
                         .schema(schema_geo_csv) \
                         .csv("/user/yurgen001/data/snapshots/geo_city") \
                         .withColumn("lat", F.regexp_replace("lat", ",", ".").cast(DoubleType())) \
                         .withColumn("lng", F.regexp_replace("lng", ",", ".").cast(DoubleType()))
#     geo_city.printSchema()
#     geo_city.show(10)

       
    
    messages = events.where("event_type = 'message' AND event.message_ts IS NOT NULL") \
                     .selectExpr("event.message_from AS user_id",
                                 "event.message_ts AS message_dt",
                                 "lat AS message_lat",
                                 "lon AS message_lon") \
                     .crossJoin(geo_city.withColumnRenamed("lat", "city_lat") \
                                        .withColumnRenamed("lng", "city_lon")) 

#     messages.show()
  

    
    messages_with_distance = messages.withColumn("distance", 2 * EARTH_RADIUS * F.asin(F.sqrt(F.pow(F.sin((F.radians('message_lat') - F.radians('city_lat')) / F.lit(2)), F.lit(2)) \
                    + F.cos(F.radians('message_lat')) * F.cos(F.radians('city_lat')) * F.pow(F.sin((F.radians('message_lon') - F.radians('city_lon')) / F.lit(2)), F.lit(2))))
                                                )
    
#     messages_with_distance.show()
    distance_rank_window = Window().partitionBy("user_id", "message_dt").orderBy("distance")
    messages_with_city = messages_with_distance.select("user_id",
                                                       "message_dt",
                                                       "message_lat",
                                                       "message_lon",
                                                       "city",
                                                       "distance") \
                                               .withColumn("distance_rank", F.row_number().over(distance_rank_window)) \
                                               .where("distance_rank = 1") \
                                               .select("user_id",
                                                       "message_dt",
                                                       "message_lat",
                                                       "message_lon",
                                                       "city")
    
    messages_with_city.cache()
#     messages_with_city.show()

    last_dt_window = Window().partitionBy("user_id").orderBy(F.desc("message_dt"))
    user_act_location = messages_with_city.withColumn("dt_rank", F.row_number().over(last_dt_window)) \
                                          .where("dt_rank = 1") \
                                          .selectExpr("user_id",
                                                      "message_lat AS act_lat",
                                                      "message_lon AS act_lon",
                                                      "city AS act_city", 
                                                      "CONCAT('Australia/', city) AS time_zone",
                                                      "FROM_UTC_TIMESTAMP(message_dt, CONCAT('Australia/', 'Sydney')) AS local_time"                            
                                                      ) 

    # user_act_location.show(20, False)
    return (events,
            messages_with_city,
            user_act_location)



def datamart_by_users(messages_with_city: DataFrame,
                      user_act_location: DataFrame) -> DataFrame:

    DAYS_COUNT = 27 # время непрерывного нахождения для определения домашнего города
     
    seq_num_window = Window().partitionBy("user_id").orderBy("message_dt")
    seq_num_city_window = Window().partitionBy("user_id", "city").orderBy("message_dt")
    user_city_visits = messages_with_city.withColumn("seq_num", F.row_number().over(seq_num_window)) \
                                         .withColumn("seq_num_city", F.row_number().over(seq_num_city_window)) \
                                         .withColumn("visit_num", F.col("seq_num") - F.col("seq_num_city"))

#     user_city_visits.show(40, False)
    user_city_visits = user_city_visits.groupBy("user_id", "city", "visit_num") \
                                       .agg(F.min("message_dt").alias("start_visit_dt"), \
                                            F.max("message_dt").alias("end_visit_dt"), \
                                           ) \
                                       .withColumn("visit_day_count", F.datediff(F.to_date("end_visit_dt"), F.to_date("start_visit_dt")))
#     user_city_visits.show(40, False)

    home_city_window = Window().partitionBy("user_id").orderBy(F.desc("start_visit_dt"))
    user_home_city = user_city_visits.where(f"visit_day_count >= {DAYS_COUNT}") \
                                     .withColumn("home_city_rank", F.row_number().over(home_city_window)) \
                                     .where("home_city_rank = 1") \
                                     .selectExpr("user_id",
                                                 "city AS home_city")             
#     user_home_city.show()
    
    user_travel = user_city_visits.orderBy("start_visit_dt") \
                                  .groupBy("user_id") \
                                  .agg(F.count("city").alias("travel_count"), \
                                       F.collect_list("city").alias("travel_array")
                                      )                     
#     user_travel.show(40, False)
    
    user_datamart = user_act_location.join(user_travel, "user_id", "left") \
                                 .join(user_home_city, "user_id", "left") \
                                 .select("user_id",
                                         "act_city",
                                         "home_city",
                                         "travel_count",
                                         "travel_array",
                                         "local_time")
    user_datamart.show()

    return user_datamart



def datamart_by_zone(events: DataFrame,
                     messages_with_city: DataFrame,
                     user_act_location: DataFrame) -> DataFrame:


    out_messages = messages_with_city.selectExpr("TO_DATE(message_dt) AS event_date",
                                                 "'message' AS event_type",
                                                 "city AS zone_id")
    
#     out_messages.show()


    out_reactions = events.where("event_type = 'reaction'") \
                      .selectExpr("CAST(event.reaction_from AS LONG) AS user_id",
                                  "TO_DATE(event.datetime) AS event_date") \
                      .join(user_act_location, "user_id", "inner") \
                      .selectExpr("event_date",
                                  "'reaction' AS event_type",
                                  "act_city AS zone_id")
    
#     out_reactions.printSchema()
#     out_reactions.show()
    

    out_subscriptions = events.where("event_type = 'subscription'") \
                              .selectExpr("CAST(event.user AS LONG) AS user_id",
                                          "TO_DATE(event.datetime) AS event_date") \
                              .join(user_act_location, "user_id", "inner") \
                              .selectExpr("event_date",
                                          "'subscription' AS event_type",
                                          "act_city AS zone_id")
    
#     out_subscriptions.printSchema()
#     out_subscriptions.show()
    
    out_registrations = events.where("event_type = 'registration'") \
                              .selectExpr("CAST(event.user AS LONG) AS user_id",
                                          "TO_DATE(event.datetime) AS event_date") \
                              .join(user_act_location, "user_id", "inner") \
                              .selectExpr("event_date",
                                          "'registration' AS event_type",
                                          "act_city AS zone_id")

#     out_registrations.printSchema()
#     out_registrations.show()

    all_events =  out_messages.unionByName(out_reactions) \
                              .unionByName(out_subscriptions) \
                              .unionByName(out_registrations) \
                              .withColumn("month", F.month("event_date")) \
                              .withColumn("week", F.expr("FLOOR(DAYOFMONTH(event_date) / 7) + 1"))
    
#     all_events.show()

    zone_datamart_by_week = all_events.groupBy("zone_id", "month", "week") \
                                      .pivot("event_type", ["message", "reaction", "subscription", "registration"]) \
                                      .agg(F.count("event_date")) \
                                      .withColumnRenamed("message", "week_message") \
                                      .withColumnRenamed("reaction", "week_reaction") \
                                      .withColumnRenamed("subscription", "week_subscription") \
                                      .withColumnRenamed("registration", "week_user")
                                      
    
#     zone_datamart_by_week.show(100)
    
    zone_datamart_by_month = all_events.groupBy("zone_id", "month") \
                                       .pivot("event_type", ["message", "reaction", "subscription", "registration"]) \
                                       .agg(F.count("event_date")) \
                                       .withColumnRenamed("message", "month_message") \
                                       .withColumnRenamed("reaction", "month_reaction") \
                                       .withColumnRenamed("subscription", "month_subscription") \
                                       .withColumnRenamed("registration", "month_user")
                                      
    
#     zone_datamart_by_month.show(100)
    
    zone_datamart = zone_datamart_by_week.join(zone_datamart_by_month, ["zone_id", "month"], "inner") \
                                         .select("month",
                                                 "week",
                                                 "zone_id",
                                                 "week_message",
                                                 "week_reaction",
                                                 "week_subscription",
                                                 "week_user",
                                                 "month_message",
                                                 "month_reaction",
                                                 "month_subscription",
                                                 "month_user",
                                                ) \
                                         .na.fill(value=0)
    zone_datamart.show()
    return zone_datamart


def datamart_friends_recomendation(events:DataFrame,
                                   user_act_location: DataFrame) -> DataFrame:

    EARTH_RADIUS = 6371

    neighbors = user_act_location.alias("df_left").join(user_act_location.alias("df_right"), F.col("df_left.user_id") < F.col("df_right.user_id"), 'inner') \
        .withColumn("distance", 2 * EARTH_RADIUS * F.expr("asin(sqrt(pow(sin((radians(df_left.act_lat) - radians(df_right.act_lat)) / 2), 2)"
        " + cos(radians(df_left.act_lat)) * cos(radians(df_right.act_lat)) * pow(sin((radians(df_left.act_lon) - radians(df_right.act_lon)) / 2), 2)))")
                   ) \
                                .where("distance < 1") \
                                .selectExpr("df_left.user_id AS left_user",
                                            "df_right.user_id AS right_user",
                                            "CURRENT_TIMESTAMP() AS processed_dttm",
                                            "df_left.act_city AS zone_id",
                                            "GREATEST(df_left.local_time, df_right.local_time) AS local_time"
                                           )

#     neighbors.show()

    messages_from_left_user = events.where("event_type = 'message'") \
                                    .selectExpr("event.message_from AS message_from",
                                                "event.message_to AS message_to") \
                                    .join(neighbors.select("left_user"), neighbors["left_user"] == F.col("message_from"), "inner") \
                                    .selectExpr("left_user",
                                                "message_to AS right_user")

#     messages_from_left_user.show()

    messages_to_left_user = events.where("event_type = 'message'") \
                                 .selectExpr("event.message_from AS message_from",
                                             "event.message_to AS message_to") \
                                 .join(neighbors.select("left_user"), neighbors["left_user"] == F.col("message_to"), "inner") \
                                 .selectExpr("left_user",
                                             "message_from AS right_user")
#     messages_to_left_user.show()

    left_user_correspondents = messages_from_left_user.unionByName(messages_to_left_user) \
                                                      .na.drop()
    
#     left_user_correspondents.show()

    neighbors_without_messages = neighbors.join(left_user_correspondents, ["left_user", "right_user"], "leftanti") 
    
    neighbors_without_messages.cache()
#     neighbors_without_messages.show()
    
    left_user_subscriptions = events.where("event_type = 'subscription'") \
                             .selectExpr("CAST(event.user AS LONG) AS left_user",
                                         "event.subscription_channel AS subscription_channel") \
                             .join(neighbors_without_messages.select("left_user"), ["left_user"], "inner") \
                             .selectExpr("left_user",
                                         "subscription_channel")

#     left_user_subscriptions.show()
    
    right_user_subscritions = events.where("event_type = 'subscription'") \
                              .selectExpr("CAST(event.user AS LONG) AS right_user",
                                          "event.subscription_channel AS subscription_channel") \
                              .join(neighbors_without_messages.select("right_user"), ["right_user"], "inner") \
                              .selectExpr("right_user",
                                          "subscription_channel")
    
#     right_user_subscritions.show()
    
    common_subscribers = left_user_subscriptions.join(right_user_subscritions, ["subscription_channel"], "inner") \
                                         .select("left_user",
                                                 "right_user")
    
#     common_subscribers.show()

    friends_recomendation = neighbors_without_messages.join(common_subscribers, ["left_user", "right_user"], "leftsemi")

    friends_recomendation.show()
    return friends_recomendation
    

def datamarts_calculation(base_path:str, date: str, depth: int, spark: SparkSession) -> None:
    
    events, messages_with_city, user_act_location = common_load_and_calculation(base_path, date, depth, spark)

    datamart_by_users(messages_with_city, user_act_location).write \
                                                            .mode('overwrite') \
                                                            .parquet(f'/user/yurgen001/data/analytics/dm_by_user_d{depth}/date={date}')

    datamart_by_zone(events, messages_with_city, user_act_location).write \
                                                                   .mode('overwrite') \
                                                                   .parquet(f'/user/yurgen001/data/analytics/dm_by_zone_d{depth}/date={date}')

    datamart_friends_recomendation(events, user_act_location).write \
                                                             .mode('overwrite') \
                                                             .parquet(f'/user/yurgen001/data/analytics/dm_friends_recomendation_d{depth}/date={date}')


if __name__ == '__main__':
    datamarts_calculation("/user/yurgen001/data/geo/events", '2022-05-31', 60, spark)


24/05/26 16:28:16 WARN CacheManager: Asked to cache already cached data.
                                                                                

+----------+----------+-----------+-----+----+
|event_date|event_type|    zone_id|month|week|
+----------+----------+-----------+-----+----+
|2021-04-25|   message|      Perth|    4|   4|
|2021-04-29|   message|     Sydney|    4|   5|
|2021-04-25|   message|  Melbourne|    4|   4|
|2021-05-07|   message|     Darwin|    5|   2|
|2021-05-20|   message|    Bendigo|    5|   3|
|2021-04-27|   message|      Perth|    4|   4|
|2021-05-04|   message|   Brisbane|    5|   1|
|2021-04-28|   message|   Brisbane|    4|   5|
|2021-05-10|   message|   Brisbane|    5|   2|
|2021-05-04|   message|    Geelong|    5|   1|
|2021-04-25|   message|    Geelong|    4|   4|
|2021-04-24|   message|     Sydney|    4|   4|
|2021-05-03|   message|   Canberra|    5|   1|
|2021-06-05|   message|Rockhampton|    6|   1|
|2021-05-07|   message|  Newcastle|    5|   2|
|2021-05-20|   message|    Bendigo|    5|   3|
|2021-05-08|   message|      Perth|    5|   2|
|2021-04-23|   message|     Cairns|    4|   4|
|2021-04-23| 



+-----+----+----------+------------+-------------+-----------------+---------+-------------+--------------+------------------+----------+
|month|week|   zone_id|week_message|week_reaction|week_subscription|week_user|month_message|month_reaction|month_subscription|month_user|
+-----+----+----------+------------+-------------+-----------------+---------+-------------+--------------+------------------+----------+
|    5|   1|  Canberra|         432|         2950|             5797|        0|         1101|         22655|             45704|         0|
|    5|   2|  Canberra|         285|         4002|             8019|        0|         1101|         22655|             45704|         0|
|    5|   3|  Canberra|         236|         4938|             9691|        0|         1101|         22655|             45704|         0|
|    5|   4|  Canberra|         119|         6564|            12936|        0|         1101|         22655|             45704|         0|
|    5|   5|  Canberra|          2


                                                                                