In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

The code failed because of a fatal error:
	Error sending http request and maximum retry encountered..

Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.


# 读取位于S3中的视频数据，有两种方式读：glue表和文件形式

In [None]:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "livecodingdemo", table_name = "raw_video_stats", transformation_ctx = "datasource0")
df_owner = spark.read.json('s3://aws-demo-live-code-demo/data/owner-stats/')
df_video = datasource0.toDF()

In [None]:
df_owner.show(5)

In [None]:
df_video.show(5)

# owner表中包括每小时爬取的数据，提取最新的数据做成当前快照

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

rank_owner =  df_owner.withColumn("rank", row_number().over(Window.partitionBy("mid").orderBy(desc("crawl_date"))))
filter_owner = rank_owner.filter(rank_owner.rank == 1)


# 其中tag字段是一个Array结构的，可读性不够，需要二次处理

In [None]:
from pyspark.sql.functions import explode
df_flat = filter_owner.select('mid', 'follower','video_counts','crawl_date', explode('tag').alias('flat'))
df = df_flat.select('mid', 'follower','video_counts','crawl_date', df_flat['flat'].getItem("游戏").alias('tag:game'),df_flat['flat'].getItem("生活").alias('tag:life'),df_flat['flat'].getItem("知识").alias('tag:knowledge'),df_flat['flat'].getItem("美食").alias('tag:food'),df_flat['flat'].getItem("音乐").alias('tag:music'),df_flat['flat'].getItem("纪录片").alias('tag:documentary'))
df_final = df.groupBy('mid', 'follower','video_counts','crawl_date').agg(max('tag:game').alias('tag:game'),max('tag:life').alias('tag:life'),max('tag:knowledge').alias('tag:knowledge'),max('tag:documentary').alias('tag:documentary'),max('tag:food').alias('tag:food'),max('tag:music').alias('tag:mucis'))

In [None]:
df_final.show()

# 把处理结果复写回S3


In [None]:
df_final.write.format('parquet').mode("overwrite").save("s3://aws-demo-live-code-demo/data/owner-stats-latest/") 

# 数据格式和结构转化，提高查询效率
## 方法一：通过时间戳判断新增数据

In [None]:
import time
from datetime import datetime,timedelta
y = time.localtime().tm_year
m = time.localtime().tm_mon
d = time.localtime().tm_mday -1
df_delta = df_owner.filter((df_owner.year==y)&(df_owner.month==m)&(df_owner.day==d))
df_owner_final = df_delta.select('crawl_date','follower','following','mid','video_counts').distinct()

In [None]:
df_owner_final.write.format('parquet').mode("append").partitionBy('mid').save("s3://aws-demo-live-code-demo/data/owner-stats-clean/") 

## 方法二：通过表的bookmark判断新增数据

In [None]:
resolvechoice2 = datasource0.resolveChoice(specs = [('mid','cast:string')])

glueContext.write_dynamic_frame.from_options(frame = resolvechoice2, connection_type = "s3", connection_options = {"path": "s3://aws-demo-live-code-demo/data/video-stats-clean/","partitionKeys": ["mid","year"]}, format = "parquet", transformation_ctx = "datasink0")


In [None]:
datasource0.printSchema()

In [None]:
resolvechoice2.printSchema()

In [None]:
df_owner.count()

In [None]:
datasource0.count()