In [9]:
%region us-east-1
%glue_version 3.0
%idle_timeout 600
%number_of_workers 5
%worker_type G.1X
%iam_role arn:aws:iam::548604325312:role/service-role/AmazonSageMaker-ExecutionRole-20230209T105554
%%configure
{
# "--enable-spark-ui": "true",
# "--spark-event-logs-path": "s3://sagemaker-us-west-2-324874492192/glue-interactive-sess-demo/logs/",
# "--additional-python-modules":"",
# "--extra-jars":""
}

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
It looks like there is a newer version of the kernel available. The latest version is 0.37.2 and you have 0.37.0 installed.
Please run `pip install --upgrade aws-glue-sessions` to upgrade your kernel
Previous region: us-east-1
Setting new region to: us-east-1
Reauthenticating Glue client with new region: us-east-1
IAM role has been set to None. Reauthenticating.
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::548604325312:role/service-role/AmazonSageMaker-ExecutionRole-20230209T105554
Authentication done.
Region is set to: us-east-1
Setting Glue version to: 3.0
Current idle_timeout is 2880 minutes.
idle_timeout has been set to 600 minu

No configuration values were provided.

In [1]:
import argparse
import random
from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 6a24ca41-cd01-489f-afeb-5f2bac3cc8a7
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 6a24ca41-cd01-489f-afeb-5f2bac3cc8a7 to get into ready status...
Session 6a24ca41-cd01-489f-afeb-5f2bac3cc8a7 has been created.



In [21]:
spark = SparkSession.builder.appName("anker_workshop").config(
    "spark.hadoop.hive.metastore.client.factory.class",
    "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
).enableHiveSupport().getOrCreate()

df1_data = [(123, 'CN', '1203', 0.89, 102.9), 
            (122, 'CN', '1203', None, 102.9), 
            (125, 'CN', '1203', 0.89, None), 
            (124, 'US', '5223', 0.89, 142.9)]
df1 = spark.createDataFrame(df1_data, ['asin', 'country_code', 'daily_budget', 'rate', 'price']) # list -> DF

df2_data = [(123, 'CN', '2023-02-09 01:00:20', 20), 
            (122, 'CN', '2023-02-08 02:12:02', 12), 
            (121, 'CN', '2023-02-08 03:12:44', 12), 
            (125, 'CN', '2023-02-04 04:04:01', 9), 
            (124, 'US', '2023-02-02 05:02:02', 8)]
df2 = spark.createDataFrame(df2_data, ['asin', 'country_code', 'report_date','review_num']) # list -> DF

df1.show()
df2.show()

+----+------------+------------+----+-----+
|asin|country_code|daily_budget|rate|price|
+----+------------+------------+----+-----+
| 123|          CN|        1203|0.89|102.9|
| 122|          CN|        1203|null|102.9|
| 125|          CN|        1203|0.89| null|
| 124|          US|        5223|0.89|142.9|
+----+------------+------------+----+-----+

+----+------------+-------------------+----------+
|asin|country_code|        report_date|review_num|
+----+------------+-------------------+----------+
| 123|          CN|2023-02-09 01:00:20|        20|
| 122|          CN|2023-02-08 02:12:02|        12|
| 121|          CN|2023-02-08 03:12:44|        12|
| 125|          CN|2023-02-04 04:04:01|         9|
| 124|          US|2023-02-02 05:02:02|         8|
+----+------------+-------------------+----------+


In [3]:
df_cast = df1.withColumn("daily_budget",df1.daily_budget.cast('float'))
df_cast.printSchema()

root
 |-- asin: long (nullable = true)
 |-- country_code: string (nullable = true)
 |-- daily_budget: float (nullable = true)
 |-- rate: double (nullable = true)
 |-- price: double (nullable = true)


In [4]:
# dropna 
# help(df.dropna) # check doc

# 没有inplace参数，spark中可以接都是inplace=Flase
df_dropna = df1.dropna(how="any", subset=['rate', 'price'])
df_dropna.show()

+----+------------+------------+----+-----+
|asin|country_code|daily_budget|rate|price|
+----+------------+------------+----+-----+
| 123|          CN|        1203|0.89|102.9|
| 124|          US|        5223|0.89|142.9|
+----+------------+------------+----+-----+


In [5]:
#分组填充空值

import pyspark.sql.functions as F
from pyspark.sql import Window

df1_ffill = df1

w_forward = Window.partitionBy('country_code').rowsBetween(Window.unboundedPreceding,Window.currentRow)
for fil_col in ['rate', 'price']:
    df1_ffill = df1_ffill.withColumn('ffill_{}'.format(fil_col), F.last(fil_col,ignorenulls=True).over(w_forward))
    
df1.show()
df1_ffill.show()

+----+------------+------------+----+-----+
|asin|country_code|daily_budget|rate|price|
+----+------------+------------+----+-----+
| 123|          CN|        1203|0.89|102.9|
| 122|          CN|        1203|null|102.9|
| 125|          CN|        1203|0.89| null|
| 124|          US|        5223|0.89|142.9|
+----+------------+------------+----+-----+

+----+------------+------------+----+-----+----------+-----------+
|asin|country_code|daily_budget|rate|price|ffill_rate|ffill_price|
+----+------------+------------+----+-----+----------+-----------+
| 124|          US|        5223|0.89|142.9|      0.89|      142.9|
| 123|          CN|        1203|0.89|102.9|      0.89|      102.9|
| 122|          CN|        1203|null|102.9|      0.89|      102.9|
| 125|          CN|        1203|0.89| null|      0.89|      102.9|
+----+------------+------------+----+-----+----------+-----------+


In [6]:
# drop column
col_list = ['daily_budget', 'rate']
  
# delete two columns
df1_drop = df1.drop(*col_list)
df1_drop.show()

+----+------------+-----+
|asin|country_code|price|
+----+------------+-----+
| 123|          CN|102.9|
| 122|          CN|102.9|
| 125|          CN| null|
| 124|          US|142.9|
+----+------------+-----+


In [7]:
# merge 表

df_merge = df1.join(df2, df1.asin ==  df2.asin,"left")
df_merge.show(truncate=False)

+----+------------+------------+----+-----+----+------------+-----------+----------+
|asin|country_code|daily_budget|rate|price|asin|country_code|report_date|review_num|
+----+------------+------------+----+-----+----+------------+-----------+----------+
|123 |CN          |1203        |0.89|102.9|123 |CN          |2023-02-09 |20        |
|122 |CN          |1203        |null|102.9|122 |CN          |2023-02-08 |12        |
|125 |CN          |1203        |0.89|null |125 |CN          |2023-02-04 |9         |
|124 |US          |5223        |0.89|142.9|124 |US          |2023-02-02 |8         |
+----+------------+------------+----+-----+----+------------+-----------+----------+


In [8]:
# + - * / 作操等

df_op = df1_ffill.withColumn('result', df1.rate * df1.price)
df_op.show()

+----+------------+------------+----+-----+----------+-----------+------------------+
|asin|country_code|daily_budget|rate|price|ffill_rate|ffill_price|            result|
+----+------------+------------+----+-----+----------+-----------+------------------+
| 124|          US|        5223|0.89|142.9|      0.89|      142.9|127.18100000000001|
| 123|          CN|        1203|0.89|102.9|      0.89|      102.9|            91.581|
| 122|          CN|        1203|null|102.9|      0.89|      102.9|              null|
| 125|          CN|        1203|0.89| null|      0.89|      102.9|              null|
+----+------------+------------+----+-----+----------+-----------+------------------+


In [122]:
# 按天补数据，resample在pyspark中没有对应函数，参考网上的方案 自己实现了一个

# Note: 
# date_field 需要时 timestamp / datetime 字段类型，不能是string
# groupBy_fields 需要时groupby 的数组类型

def resample(df, date_field, groupBy_fields):
    # date_field = 'readtime'
    # groupBy_fields = ['house']
    date_min_name = '{}_min'.format(date_field)
    date_max_name = '{}_max'.format(date_field)

    df_base = df.groupby(groupBy_fields).agg(func.min(date_field).cast('integer').alias(date_min_name), \
                               func.max(date_field).cast('integer').alias(date_max_name))

    def date_range(t1, t2, step=60*60*24):
        """Return a list of equally spaced points between t1 and t2 with stepsize step."""
        return [t1 + step*x for x in range(int((t2-t1)/step)+1)]

    # define udf
    date_range_udf = func.udf(date_range, ArrayType(LongType()))

    # generate timegrid and explode
    df_base = df_base.withColumn(date_field, func.explode(date_range_udf(date_min_name, date_max_name)))\
                 .drop(date_min_name, date_max_name)

    df_base = df_base.withColumn(date_field, col(date_field).cast('timestamp'))

    join_fields = groupBy_fields.copy()
    join_fields.append(date_field)

    sup_df = df_base.join(df, join_fields, "leftouter")
    
    return sup_df




In [123]:
# 按天补数据 example

from pyspark.sql import functions as F
df2 = df2.withColumn('new_date',F.unix_timestamp(df2.report_date, 'yyyy-MM-dd HH:mm:ss').cast('timestamp'))

df_out = resample(df2, 'new_date', ['country_code'])
df_out.show(100)

+------------+-------------------+----+-------------------+----------+
|country_code|           new_date|asin|        report_date|review_num|
+------------+-------------------+----+-------------------+----------+
|          US|2023-02-02 05:02:02| 124|2023-02-02 05:02:02|         8|
|          CN|2023-02-04 04:04:01| 125|2023-02-04 04:04:01|         9|
|          CN|2023-02-05 04:04:01|null|               null|      null|
|          CN|2023-02-06 04:04:01|null|               null|      null|
|          CN|2023-02-07 04:04:01|null|               null|      null|
|          CN|2023-02-08 04:04:01|null|               null|      null|
+------------+-------------------+----+-------------------+----------+


In [137]:
#滑窗特征
df3_data = [(121, 'CN', '2023-02-09 01:00:20', 20), 
            (121, 'CN', '2023-02-08 02:12:02', 12), 
            (121, 'CN', '2023-02-07 03:12:44', 12), 
            (122, 'US', '2023-02-04 04:04:01', 9), 
            (122, 'US', '2023-02-02 05:02:02', 8)]
df3 = spark.createDataFrame(df3_data, ['asin', 'country_code', 'report_date','review_num']) # list -> DF


df3.registerTempTable("asin_review_table")

# 天级聚合表
df3_rolling = spark.sql(
    """
    select asin, country_code, report_date, review_num, 
    avg(review_num) over (partition by asin, country_code order by report_date rows between 2 preceding and current row) as avg_window,
    sum(review_num) over (partition by asin, country_code order by report_date rows between 2 preceding and current row) as sum_window
    from asin_review_table
    """
)




In [138]:
df3_rolling.show()

+----+------------+-------------------+----------+------------------+----------+
|asin|country_code|        report_date|review_num|        avg_window|sum_window|
+----+------------+-------------------+----------+------------------+----------+
| 121|          CN|2023-02-07 03:12:44|        12|              12.0|        12|
| 121|          CN|2023-02-08 02:12:02|        12|              12.0|        24|
| 121|          CN|2023-02-09 01:00:20|        20|14.666666666666666|        44|
| 122|          US|2023-02-02 05:02:02|         8|               8.0|         8|
| 122|          US|2023-02-04 04:04:01|         9|               8.5|        17|
+----+------------+-------------------+----------+------------------+----------+


In [130]:
# onehot 特征处理

import pyspark.sql.functions as F 
categ = df2.select('country_code').distinct().rdd.flatMap(lambda x:x).collect()
exprs = [F.when(F.col('country_code') == cat,1).otherwise(0)\
            .alias(str(cat)) for cat in categ]
df_onehot = df2.select(exprs+df2.columns)




In [131]:
df_onehot.show()

+---+---+----+------------+-------------------+----------+-------------------+
| US| CN|asin|country_code|        report_date|review_num|           new_date|
+---+---+----+------------+-------------------+----------+-------------------+
|  0|  1| 123|          CN|2023-02-09 01:00:20|        20|2023-02-09 01:00:20|
|  0|  1| 122|          CN|2023-02-08 02:12:02|        12|2023-02-08 02:12:02|
|  0|  1| 121|          CN|2023-02-08 03:12:44|        12|2023-02-08 03:12:44|
|  0|  1| 125|          CN|2023-02-04 04:04:01|         9|2023-02-04 04:04:01|
|  1|  0| 124|          US|2023-02-02 05:02:02|         8|2023-02-02 05:02:02|
+---+---+----+------------+-------------------+----------+-------------------+
