In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = (SparkConf().setMaster("k8s://https://192.168.219.100:6443") # Your master address name
        .set("spark.kubernetes.container.image", "joron1827/pyspark:latest") # Spark image name
        .set("spark.driver.port", "2222") # Needs to match svc
        .set("spark.driver.blockManager.port", "7777")
        .set("spark.driver.host", "driver-service.jupyterhub.svc.cluster.local") # Needs to match svc
        .set("spark.driver.bindAddress", "0.0.0.0")
        .set("spark.kubernetes.namespace", "spark")
        .set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
        .set("spark.kubernetes.authenticate.serviceAccountName", "spark")
        .set("spark.executor.instances", "3")
        .set("spark.kubernetes.container.image.pullPolicy", "IfNotPresent")
        .set("spark.app.name", "joronSpark")
        .set("spark.executor.cores", "3"))


In [4]:

# create SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()


# rolebinding.rbac.authorization.k8s.io/spark created
# 를 생성하니까 됐음.

In [5]:
'''
/stock_info/code.txt
/stock_info/stock_name.txt
'''

'\n/stock_info/code.txt\n/stock_info/stock_name.txt\n'

In [6]:
import pandas as pd

code_df = spark.read.text("hdfs://192.168.219.121:9000/stock_info/code.txt").toPandas()
name_df = spark.read.text("hdfs://192.168.219.121:9000/stock_info/stock_name.txt").toPandas()

                                                                                

In [4]:
merged_df = pd.concat([code_df,name_df],axis=1)
merged_df = merged_df.iloc[3:-2,:]

In [5]:
stock_code_df = pd.DataFrame(merged_df.values)
stock_code_df.columns = ['stock_code', 'stock_name']
stock_code_df['stock_code']=stock_code_df['stock_code'].str.replace('A','')
stock_code_df.stock_code.astype(str)
stock_code_df.stock_name.astype(str)
stock_code_df

Unnamed: 0,stock_code,stock_name
0,000020,동화약품
1,000040,KR모터스
2,000050,경방
3,000070,삼양홀딩스
4,000075,삼양홀딩스우
...,...,...
942,452260,한화갤러리아
943,45226K,한화갤러리아우
944,453340,현대그린푸드
945,900140,엘브이엠씨홀딩스


In [60]:
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine('postgresql://root:root@joron1827.asuscomm.com:9480/stock')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fd1f53bb5b0>

In [62]:
stock_code_df.to_sql(name="kospi_code", con=engine, if_exists='replace')


947

In [7]:
import copy

crawling_docs = copy.deepcopy(stock_code_df)
crawling_docs.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 947 entries, 0 to 946
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   stock_code  947 non-null    object
 1   stock_name  947 non-null    object
dtypes: object(2)
memory usage: 14.9+ KB


In [7]:
import numpy as np
crawling_docs['page_num'] = np.zeros(len(crawling_docs), dtype=np.int64)
crawling_docs['check_num'] = np.zeros(len(crawling_docs), dtype=np.int64)
crawling_docs.columns.values

array(['stock_code', 'stock_name', 'page_num', 'check_num'], dtype=object)

In [12]:
crawling_docs.to_parquet('./df.parquet', compression='gzip')


In [9]:
df.write.option("header",True).csv("hdfs://192.168.219.121:9000/crawling/code/code_info.csv")


                                                                                

In [14]:
df=spark.read.parquet("hdfs://192.168.219.121:9000/crawling/code/code_info.parquet")
df.show()

+----------+------------------+--------+---------+
|stock_code|        stock_name|page_num|check_num|
+----------+------------------+--------+---------+
|    000020|          동화약품|    1500|        0|
|    000040|          KR모터스|       0|        0|
|    000050|              경방|       0|        0|
|    000070|        삼양홀딩스|       0|        0|
|    000075|      삼양홀딩스우|       0|        0|
|    000080|        하이트진로|       0|        0|
|    000087|    하이트진로2우B|       0|        0|
|    000100|          유한양행|       0|        0|
|    000105|        유한양행우|       0|        0|
|    000120|        CJ대한통운|       0|        0|
|    000140|  하이트진로홀딩스|       0|        0|
|    000145|하이트진로홀딩스우|       0|        0|
|    000150|              두산|       0|        0|
|    000155|            두산우|       0|        0|
|    000157|          두산2우B|       0|        0|
|    000180|      성창기업지주|       0|        0|
|    000210|                DL|       0|        0|
|    000215|              DL우|       0|        0|
|    

In [4]:
df=spark.read.parquet("hdfs://192.168.219.121:9000/crawling/23-05-15*.parquet")
df.show()

[Stage 4:>                                                          (0 + 1) / 1]

+------+----------------+------------------------------------+-----+---+---+-----------------+
|  code|            date|                               title|views|pos|neg|__index_level_0__|
+------+----------------+------------------------------------+-----+---+---+-----------------+
|005930|2022.02.28 02:27|         추짭한괴야tomsㅋb120ㅋtr...|  319|  1|  1|                0|
|005930|2022.02.28 02:22|         노인네들 말 들어, 파병하자!|  336|  3|  0|                0|
|005930|2022.02.28 02:19|                     한미가 동맹이면|  336|  2|  1|                0|
|005930|2022.02.28 02:15|        신천지 타고온 쥴리 주가조작 |  326|  1|  0|                0|
|005930|2022.02.28 02:15|   기르는 개만도 못한 노인네들의 ...|  345|  4|  5|                0|
|005930|2022.02.28 02:14|         @@지금까지 국회는 죄인들 @@|  327|  2|  0|                0|
|005930|2022.02.28 02:13|                         6.25.세대  |  332|  3|  4|                0|
|005930|2022.02.28 02:10|      @@왜  보수라고 하는사람들을 @@|  341|  1|  1|                0|
|005930|2022.02.28 02:09|   

                                                                                

In [20]:
df = crawling_docs
codes, page = df[df.check_num != 1].loc[0,['stock_code','page_num']].values
print(type(codes), type(page))


<class 'str'> <class 'numpy.int64'>


In [3]:

df=spark.read.parquet("hdfs://192.168.219.121:9000/crawling/23-05-08-May-59-1683579597-postgres.parquet")
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+------+----------------+------------------------------------+-----+---+---+-----------------+
|  code|            date|                               title|views|pos|neg|__index_level_0__|
+------+----------------+------------------------------------+-----+---+---+-----------------+
|093370|2023.02.20 10:18|              2022 결산실적발표 임박|  492|  3|  1|                0|
|093370|2023.02.20 10:08|                    개폭등 전 움직임|  547|  5|  3|                0|
|093370|2023.02.20 10:06|                     후성은 개털이다|  446|  1|  2|                0|
|093370|2023.02.20 09:56|              리튬주나 사는게 나을듯|  444|  1|  3|                0|
|093370|2023.02.20 09:53|                     계단식하락 시작|  423|  1|  2|                0|
|093370|2023.02.20 09:50|                              이러니|  450|  3|  0|                0|
|093370|2023.02.20 09:50|                                ㅠㅠ|  178|  0|  0|                0|
|093370|2023.02.20 09:45|                        대머리일가들|  448|  1|  0|                0|
|0933

                                                                                

In [7]:
spark.stop()

23/05/18 17:56:28 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
