# Word Count in Spark

### Setup

Spark를 먼저 세팅해봅시다.

In [1]:
# !pip install pyspark
# !pip install -U -q PyDrive
# !apt install openjdk-8-jdk-headless -qq
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

다음으로, Google Drive client 권한을 받아 Spark 실습을 위해 사용될 파일을 받아봅시다.

In [2]:
# from pydrive.auth import GoogleAuth
# from pydrive.drive import GoogleDrive
# from google.colab import auth
# from oauth2client.client import GoogleCredentials

# # Authenticate and create the PyDrive client
# auth.authenticate_user()
# gauth = GoogleAuth()
# gauth.credentials = GoogleCredentials.get_application_default()
# drive = GoogleDrive(gauth)

In [1]:
# id='1SE6k_0YukzGd5wK-E4i6mG83nydlfvSa'
# downloaded = drive.CreateFile({'id': id})
# downloaded.GetContentFile('pg100.txt')
import requests
text = requests.get("https://www.gutenberg.org/cache/epub/100/pg100.txt").content.decode("utf-8")
# print(text)
with open("./test01.txt", "w") as f:
    f.write(text)

위의 셀들을 실행하면,  *pg100.txt* 가 "Files" 탭에 다운된 것을 확인할 수 있습니다.

### 실습

*pg100.txt* 파일은 셰익스피어의 작품들의 사본들을 가지고 있습니다.

Spark를 사용하여 각 영어 letter로 시작하는 단어들의 갯수들을 출력하는 프로그램을 만들어보세요. 한마디로 각 letter에 대해, (non-unique)한 단어들의 총 갯수들을 세는 것입니다.

**대소문자는 무시하세요**, i.e. 단어들 전부 lower case라 생각하시면 됩니다. 또한, 알파벳이 아닌 character로 시작하는 단어들도 무시해도 됩니다. **문서 전체**에 대해 word count를 출력하세요(제목, 저자, 줄거리 포함). 라인 바꿈으로 -으로 연결된 단어들은 두 개의 다른 단어로 counting되어도 됩니다(e.g. "project", "pro-ject" 는 다른 단어).

출력에 대한 오차범위는 5미만으로 나오게 해주세요!

In [1]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd

# create the Spark Session
spark = SparkSession.builder.appName("spakr-01").getOrCreate()

# create the Spark Context
sc = spark.sparkContext



In [8]:
# CODE HERE
import time
from datetime import timedelta

document = sc.textFile("./pg100.txt")
print(document.take(2))
print("="*100)

start = time.time()
for i, line in enumerate(document.collect()):
    if i==10:
        break
    print(f"{i} | {line}")
print("="*100)
print("elapsed time :", timedelta(seconds=(time.time()-start)))

['The Project Gutenberg eBook of The Complete Works of William Shakespeare, by William Shakespeare', '']
0 | The Project Gutenberg eBook of The Complete Works of William Shakespeare, by William Shakespeare
1 | 
2 | This eBook is for the use of anyone anywhere in the United States and
3 | most other parts of the world at no cost and with almost no restrictions
4 | whatsoever. You may copy it, give it away or re-use it under the terms
5 | of the Project Gutenberg License included with this eBook or online at
6 | www.gutenberg.org. If you are not located in the United States, you
7 | will have to check the laws of the country where you are located before
8 | using this eBook.
9 | 
elapsed time : 0:00:00.605508


In [4]:
start = time.time()
counts = document.flatMap(lambda line: line.split())
for i, line in enumerate(counts.collect()):
    if i==10:
        break
    print(line)
print("="*100)
print("elapsed time :", timedelta(seconds=(time.time()-start)))

The
Project
Gutenberg
eBook
of
The
Complete
Works
of
William
elapsed time : 0:00:00.618911


In [3]:
start = time.time()
counts = document.flatMap(lambda line: line.split()).filter(lambda word: word.isalpha())\
    .map(lambda word: word[0].lower())\
        .map(lambda letter: (letter, 1))\
            .reduceByKey(lambda n1, n2: n1+n2).sortByKey(lambda letter_cnt:letter_cnt[0])
for i, line in enumerate(counts.collect()):
    if i==10:
        break
    print(line)
print("="*100)
counts.toDF(["letter","count"]).show(10)
print("elapsed time :", timedelta(seconds=(time.time()-start)))

('a', 76666)
('b', 35712)
('c', 19564)
('d', 20314)
('e', 12056)
('f', 27244)
('g', 14405)
('h', 48531)
('i', 56010)
('j', 1539)
+------+-----+
|letter|count|
+------+-----+
|     a|76666|
|     b|35712|
|     c|19564|
|     d|20314|
|     e|12056|
|     f|27244|
|     g|14405|
|     h|48531|
|     i|56010|
|     j| 1539|
+------+-----+
only showing top 10 rows

elapsed time : 0:00:04.518156


In [8]:
# CODE HERE
import time
from datetime import timedelta

document = sc.parallelize(sc.textFile("./pg100.txt").collect())


start = time.time()


counts = document.flatMap(lambda line: line.split()).filter(lambda word: word.isalpha())\
    .map(lambda word: word[0].lower())\
        .map(lambda letter: (letter, 1))\
            .reduceByKey(lambda n1, n2: n1+n2).sortByKey(lambda letter_cnt:letter_cnt[0])

for i, line in enumerate(counts.collect()):
    if i==10:
        break
    print(line)

print("="*100)

counts.toDF(["letter","count"]).show(10)


print("elapsed time :", timedelta(seconds=(time.time()-start)))

('a', 76666)
('b', 35712)
('c', 19564)
('d', 20314)
('e', 12056)
('f', 27244)
('g', 14405)
('h', 48531)
('i', 56010)
('j', 1539)
+------+-----+
|letter|count|
+------+-----+
|     a|76666|
|     b|35712|
|     c|19564|
|     d|20314|
|     e|12056|
|     f|27244|
|     g|14405|
|     h|48531|
|     i|56010|
|     j| 1539|
+------+-----+
only showing top 10 rows

elapsed time : 0:00:01.821722


In [5]:
# print(document)
document
for i, line in enumerate(document.collect()):
    if i == 10:
        break
    print(f"{i}|  {line}")

                                                                                

0|  The Project Gutenberg eBook of The Complete Works of William Shakespeare, by William Shakespeare
1|  
2|  This eBook is for the use of anyone anywhere in the United States and
3|  most other parts of the world at no cost and with almost no restrictions
4|  whatsoever. You may copy it, give it away or re-use it under the terms
5|  of the Project Gutenberg License included with this eBook or online at
6|  www.gutenberg.org. If you are not located in the United States, you
7|  will have to check the laws of the country where you are located before
8|  using this eBook.
9|  


In [6]:
flatdoc = document.flatMap(lambda line: line.split())#.filter(lambda word: word.isalpha())
for i, line in enumerate(flatdoc.collect()):
    if i == 5:
        break
    print(f"{i}|  {line}")

[Stage 1:>                                                          (0 + 2) / 2]

22/09/21 08:12:45 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 547, in main
    % ("%d.%d" % sys.version_info[:2], version)
RuntimeError: Python in worker has different version 3.6 than that in driver 3.10, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.for

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 3) (ljjDock executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 547, in main
    % ("%d.%d" % sys.version_info[:2], version)
RuntimeError: Python in worker has different version 3.6 than that in driver 3.10, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 547, in main
    % ("%d.%d" % sys.version_info[:2], version)
RuntimeError: Python in worker has different version 3.6 than that in driver 3.10, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [7]:
print(type(counts))

NameError: name 'counts' is not defined