In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


def main():
    # create glue context first
    glueContext = GlueContext(SparkContext.getOrCreate())
    
    # creating dataframes from existing athena catelog
    up_features = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-shawn/features/up_features_db/"]})
    prd_features = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-shawn/features/prd_feature_db/"]})
    user_features_1 = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-shawn/features/user_features_1_db/"]})
    user_features_2 = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-shawn/features/user_features_2_db/"]})
    
    # join user features together
    users = Join.apply(user_features_1.rename_field('user_id','user_id1'), user_features_2, 'user_id1', 'user_id').drop_fields(['user_id1'])
    
    # join everything together
    df = Join.apply(Join.apply(up_features, users.rename_field('user_id','user_id1'), 
                      'user_id','user_id1').drop_fields(['user_id1']),
          prd_features.rename_field('product_id','product_id1'), 
          'product_id','product_id1').drop_fields(['product_id1'])
          
    # convert glue dynamic dataframe to spark dataframe
    df_spark = df.toDF()
    df_spark.repartition(1).write.mode('overwrite').format('csv').save("s3://imba-shawn/output", header = 'true')
    
if __name__ == '__main__':
    main()
        

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1648516126529_0004,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# create glue context first
glueContext = GlueContext(SparkContext.getOrCreate())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
up_features = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-shawn/features/up_features_db/"]})


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
prd_features = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-shawn/features/prd_feature_db/"]})


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
user_features_1 = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-shawn/features/user_features_1_db/"]})


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
user_features_2 = glueContext.create_dynamic_frame_from_options(connection_type = "parquet", connection_options = {"paths": ["s3://imba-shawn/features/user_features_2_db/"]})


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
up_features

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<awsglue.dynamicframe.DynamicFrame object at 0x7f3c1f044860>

In [8]:
users = Join.apply(user_features_1.rename_field('user_id','user_id1'), user_features_2, 'user_id1', 'user_id').drop_fields(['user_id1'])


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
df = Join.apply(Join.apply(up_features, users.rename_field('user_id','user_id1'), 
                      'user_id','user_id1').drop_fields(['user_id1']),
          prd_features.rename_field('product_id','product_id1'), 
          'product_id','product_id1').drop_fields(['product_id1'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
df_spark = df.toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
df_spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[product_id: bigint, avg_cart_position: int, avg_of_days_since_prior_order: double, max_order_number: double, up_orders: bigint, user_distinct_products: int, prod_second_orders: int, prod_reorders: int, user_reorder_ratio: double, user_total_products: int, sum_of_days_since_prior_order: double, up_first_order: int, prod_orders: int, up_last_order: int, prod_first_orders: int, user_id: bigint]

In [12]:
df_spark.repartition(1).write.mode('overwrite').format('csv').save("s3://imba-shawn/output", header = 'true')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…