In [0]:
%spark.pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.master("yarn").appName("card 201909 test1").config("spark.cassandra.connection.host", "172.21.0.6").getOrCreate()

df = spark.read.format("org.apache.spark.sql.cassandra").options(table="cardhistory", keyspace="trend").load()

df.show()

In [1]:
%spark.pyspark

from pyspark.sql.functions import rank, col, count, when
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window

tmp_df = df.select(df.userid, F.substring(df.date, 1,6).alias('date'), df.category, df.amount)
tmp_df = tmp_df.withColumn("amount", col("amount").cast(IntegerType()))


# userid, date, category별로 amount 합계 계산
df_grouped = tmp_df.groupBy("userid", "date", "category").agg(F.sum("amount").alias("total_amount"))

# 각 userid, date 조합 내에서 total_amount에 따른 순위를 매김
windowSpec = Window.partitionBy("userid", "date").orderBy(col("total_amount").desc())
df_ranked = df_grouped.withColumn("rank", F.row_number().over(windowSpec))

# 상위 4개 category 선택, 나머지는 'etc'로 묶기
df_final = df_ranked.withColumn("category",
                                 when(col("rank") <= 4, col("category")).otherwise("etc"))

# 다시 그룹화하여 etc를 포함한 새로운 합계 계산
df_result = df_final.groupBy("userid", "date", "category").agg(F.sum("total_amount").alias("total_amount"))

# 결과 출력
df_result.show()

In [2]:
%spark.pyspark
from pyspark.sql.functions import monotonically_increasing_id

my_consumption = df_result.select(monotonically_increasing_id().alias('id'),
df_result.userid,
df_result.date,
df_result.category,
df_result.total_amount.alias("totalamount"))

# 결과 출력
my_consumption.show()

In [3]:
%spark.pyspark
my_consumption.write.format('org.apache.spark.sql.cassandra').mode('append').option("keyspace", "trend").option("table", "myconsumption").option("spark.cassandra.output.consistency.level", "ONE").save()

# 테스트
df_test = spark.read.format("org.apache.spark.sql.cassandra").options(table="myconsumption", keyspace="trend").load()
df_test.show()

In [4]:
%spark.pyspark
