In [1]:
user_data_path = "/user/t.chahoyan/data_1/trainDemography"
country_data_path = "/user/t.chahoyan/data_1/geography/countries.csv"
current_dt = "2019-05-01"
output_path = "/user/t.chahoyan/hometask_1"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
420,application_1544538869062_0689,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, LongType, StringType

In [3]:
#таблица пользователей

schema_user = StructType([
    StructField("userId", IntegerType()),
    StructField("create_date", LongType()),
    StructField("birth_date", IntegerType()),
    StructField("gender", IntegerType()),
    StructField("ID_country", LongType()),
    StructField("ID_Location", IntegerType()),
    StructField("loginRegion", IntegerType()),
    StructField("isInGraph", IntegerType())
])

df_user = spark.read.schema(schema_user).csv(user_data_path, sep='\t')
df_user = df_user.select(["ID_country", "userId", "birth_date", "gender"])

In [4]:
# таблица стран

schema_countries = StructType([
    StructField("ID_country", LongType()),
    StructField("name_country", StringType()),
])

df_countries = spark.read.schema(schema_countries).csv(country_data_path, sep=',')

In [26]:
from pyspark.sql.functions import lit, avg, col, year, count, desc
from pyspark.sql.functions import round as F_round

# из таблицы пользователей вытаскиваем id страны и дату рождения; 
# берём текущую дату и 1970-01-01, от которой ведётся отсчёт в исходных данных;
# с помощью функции date_sub считаем дату рождения пользователя 
# (так как birth_date - количество дней от 1970-01-01 - так сказано в документации);
# с помощью функции datediff считаем разницу между текущей датой и датой рождения - получаем возраст;
# далее с помощью функции year получаем год от возраста;
# далее группируем по id страны и подсчитываем количество пользователей из данной страны и средний возраст;
# результат записываем в first_part


first_part = df_user \
    .select("ID_country", "birth_date") \
    .withColumn("static_date", lit("1970-01-01").cast("date")) \
    .withColumn("current_date", lit(current_dt).cast("date")) \
    .selectExpr('*', 'date_sub(static_date, -birth_date) as born') \
    .selectExpr('*', 'datediff(current_date, born) as age') \
    .withColumn("age", year(col("current_date")) - year(col("born"))) \
    .groupby("ID_country") \
    .agg(count("ID_country").alias("user_cnt"), avg("age").alias("age_avg")) \

In [27]:
# из таблицы пользователей вытаскиваем id страны и пол, группируем по id страны;
# подсчитываем количество мужчин и женщин для каждой страны; результат записываем в second_part

second_part = df_user \
          .select(["ID_country", "gender"]) \
          .groupby("ID_country") \
          .pivot("gender") \
          .count() \
          .select("ID_country", col("1").alias("men_cnt"), col("2").alias("women_cnt"))

# объединяем таблицы first_part и second_part по id стран;
# находим долю мужчин и долю женщин, округляя до 2 знака после запятой;
# округляем средний возраст до 2 знака после запятой;
# выводим в нужном порядке

result = first_part.join(second_part, "ID_country") \
.join(df_countries, "ID_country") \
.withColumn("men_share", F_round(col("men_cnt") / col("user_cnt"), 2)) \
.withColumn("women_share", F_round(col("women_cnt") / col("user_cnt"), 2)) \
.withColumn("age_avg", F_round(col("age_avg"), 2)) \
.select(["name_country", "user_cnt", "age_avg", "men_cnt", "women_cnt", "men_share", "women_share"]) \
.orderBy(col("user_cnt").desc())

In [30]:
# выгружаем результат на hdfs одним .csv файлом

result \
.sortWithinPartitions('user_cnt', ascending = False) \
.repartition(1) \
.write.csv(output_path, mode = 'overwrite', sep = '\t', header = True)