In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.conf import RuntimeConfig
from pyspark.sql.types import DoubleType
import functools
import pandas as pd
import time
import os

In [4]:
books = 'hdfs://master:9000/user/practice/book_infoo.csv'

In [5]:
sc = SparkSession.builder.appName('books').getOrCreate()
df = sc.read.option('multiline', True).csv(books, header=True, inferSchema=True)

In [6]:
df = df.drop('sub_category', 'publish_date', 'aladin_price', 'aladin_star', 'aladin_review',
        'kyobo_price', 'kyobo_star', 'kyobo_review', 'yes24_price', 'yes24_star',
        'yes24_review', 'interpark_price', 'interpark_star', 'interpark_review', 'min price',
        'avg stars', 'num of reviews', 'publisher', 'book_author', 'book_title', 'book_id', 'book_trailer')

In [7]:
df = df.select('main_category', col('book_isbn').alias('isbn'))

In [8]:
spark = SparkSession.builder.getOrCreate()

In [9]:
key_all = spark.read.csv('hdfs://master:9000/user/practice/key_2021.csv', header=True, inferSchema=True).drop('_c0')

In [10]:
key_all = key_all.select(col('isbn13').alias('isbn'), 'term', 'freq')

In [11]:
keyword = df.join(key_all, 'isbn')

In [12]:
keyword.limit(20).toPandas()

Unnamed: 0,isbn,main_category,term,freq
0,9788935213344,자기계발,소학,9
1,9788935213344,자기계발,공부,8
2,9788935213344,자기계발,어른,8
3,9788935213344,자기계발,사람,7
4,9788935213344,자기계발,천년의내공,6
5,9788935213344,자기계발,다산의마지막공부,6
6,9788935213344,자기계발,자신,6
7,9788935213344,자기계발,단단,5
8,9788935213344,자기계발,시작,4
9,9788935213344,자기계발,다산,4


In [13]:
keyword = keyword.drop('isbn')

In [14]:
keyword = keyword.groupBy('main_category', 'term').agg(sum('freq'))

In [None]:
keyword.limit(20).toPandas()

In [17]:
keyword = keyword.filter((keyword.term != 'br') & (keyword.term != 'div'))

In [19]:
keyword.limit(10).toPandas()

Unnamed: 0,main_category,term,freq
0,가정/생활/요리,아이,1200
1,가정/생활/요리,고민,416
2,가정/생활/요리,부모,401
3,가정/생활/요리,엄마,391
4,가정/생활/요리,공부,330
5,가정/생활/요리,와인,319
6,가정/생활/요리,샐러드,271
7,가정/생활/요리,수학,262
8,가정/생활/요리,방법,250
9,가정/생활/요리,아이들,225


In [15]:
keyword = keyword.select('main_category', 'term', col('sum(freq)').alias('freq')).orderBy(col('main_category').asc(), col('freq').desc(), 
                                                                                         col('term').asc())

In [None]:
keyword.limit(20).toPandas()

In [20]:
keyword.filter(col('main_category') == '시/에세이').limit(10).toPandas()

Unnamed: 0,main_category,term,freq
0,시/에세이,사랑,373
1,시/에세이,사람,339
2,시/에세이,시인,332
3,시/에세이,여행,210
4,시/에세이,시집,206
5,시/에세이,마음,184
6,시/에세이,세상,154
7,시/에세이,시간,144
8,시/에세이,자신,143
9,시/에세이,인생,128


In [None]:
keyword.coalesce(1).write.format('com.databricks.spark.csv') \
   .option('header', 'true').save('/home/hadoop/key_ver2')