<a target="_blank" href="../cluster" style="font-size:20px">All Applications (YARN)</a>

# Конфигурация Spark

https://spark.apache.org/docs/latest/configuration.html

https://spark.apache.org/docs/latest/running-on-yarn.html

RAM usage (in MB):
- Driver / Application Master: 512 + 384 = 896
- Executor: 512 + 576 = 1088
- Total: 896 + 1088 * 2 = 3072 (yarn limit)

In [1]:
! cat $SPARK_HOME/conf/spark-defaults.conf

cat: /conf/spark-defaults.conf: No such file or directory


# Создаем SparkContext
https://spark.apache.org/docs/latest/rdd-programming-guide.html

In [2]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName='jupyter')

In [3]:
import json

# Датасет Wikipedia

In [7]:
! ls -lh *MS*

-rw-rw-r-- 1 29999 29999 87M Nov 26  2020 MS-MF.csv


In [13]:
! cat MS-MF.csv | wc -l 

57604


In [10]:
! head -n 1 MS-MF.csv 




In [14]:
# make a sample
! head -n 1000 MS-MF.csv > MS-MFsample.csv

# Копируем файлы в HDFS

In [15]:
! hadoop fs -copyFromLocal MSMF /

In [17]:
! hadoop fs -ls -h /MSMF

Found 2 items
-rw-r--r--   1 root supergroup     86.7 M 2021-06-04 09:30 /MSMF/MS-MF.csv
-rw-r--r--   1 root supergroup      1.7 M 2021-06-04 09:30 /MSMF/MS-MFsample.csv


# Создаем RDD
https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

In [10]:
rdd = sc.parallelize(range(5))
rdd

PythonRDD[3] at RDD at PythonRDD.scala:53

In [115]:
rdd = sc.textFile("hdfs:///MSMF/MS-MF.csv")
rdd

hdfs:///MSMF/MS-MF.csv MapPartitionsRDD[100] at textFile at NativeMethodAccessorImpl.java:-2

# Действия над RDD
https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

In [12]:
rdd = sc.parallelize(range(5))

In [13]:
rdd.collect()

[0, 1, 2, 3, 4]

In [116]:
rdd.count()

57604

In [15]:
rdd.first()

0

In [16]:
rdd.mean()

2.0

In [17]:
rdd.take(2)

[0, 1]

In [18]:
rdd = sc.parallelize(["one", "two"])

In [19]:
rdd.saveAsTextFile("hdfs:///spark.txt")

In [20]:
! hadoop fs -cat "/spark.txt/*"

one
two


# Трансформации RDD
https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

In [25]:
rdd = sc.textFile("hdfs:///wiki/wiki.jsonl")

In [19]:
rdd.sample(False, 0.1).collect()

[u'0P00000ESH,LU0757425763,Threadneedle (Lux) - American Select Class AU (USD Accumulation Shares),2000-07-28,US Large-Cap Growth Equity,2.0,,3.0,2.0,"The American Select Portfolio seeks to achieve capital appreciation by investing principally in the equity securities of companies domiciled in North America or which have significant North American operations. These include large,medium, and smaller companies. There will be no particular specialisation. The select investment approach means that the Portfolio has the flexibility to take significant stock and sector positions which may lead to increased levels of volatility.",Ashish Kochar,,S&P 500 TR USD,Russell 1000 Growth TR USD,Growth,187.85,Large,332.11,28.43,4.19,4.4,19.13,0.74,12.09,32.12,12.38,15.41,9.68,8.9,23.92,14.31,,,,,,,99.78,0.0,0.22,0.0,USA: 100.0,,11.59,20.26,,2.58,10.6,,19.32,,2.75,32.9,66.84,27.68,5.3,0.18,0.0,,,,,,,,,38.0,,1.8,1.5,2.3,10.48,8.46,21.24,3.0,23.0,5.11,0.0,10.79,0.0,0.0,0.0,0.31,0.18,0.0,0.0,0.0,0.0,0.0,20

In [156]:
import csv
import re
#rdd.map(lambda x: str_to_dict(list(unicodecsv.reader([x], encoding='UTF-8')))).take(6)
#rdd.map(lambda x:  str_to_dict(re.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)",x))).take(6)
rdd.map(lambda x:  (re.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)",x))).take(1)


[[u'ticker',
  u'isin',
  u'fund_name',
  u'inception_date',
  u'category',
  u'rating',
  u'analyst_rating',
  u'risk_rating',
  u'performance_rating',
  u'investment_strategy',
  u'investment_managers',
  u'dividend_frequency',
  u'fund_benchmark',
  u'morningstar_benchmark',
  u'equity_style',
  u'equity_style_score',
  u'equity_size',
  u'equity_size_score',
  u'price_prospective_earnings',
  u'price_book_ratio',
  u'price_sales_ratio',
  u'price_cash_flow_ratio',
  u'dividend_yield_factor',
  u'long_term_projected_earnings_growth',
  u'historical_earnings_growth',
  u'sales_growth',
  u'cash_flow_growth',
  u'book_value_growth',
  u'roa',
  u'roe',
  u'roic',
  u'bond_interest_rate_sensitivity',
  u'bond_credit_quality',
  u'average_coupon_rate',
  u'average_credit_quality',
  u'modified_duration',
  u'effective_maturity',
  u'asset_stock',
  u'asset_bond',
  u'asset_cash',
  u'asset_other',
  u'country_exposure',
  u'sector_basic_materials',
  u'sector_consumer_cyclical',
  u'sec

In [158]:
def str_to_dict(arr):
    keys=['ticker',
        'isin',
        'fund_name',
        'inception_date',
        'category',
        'rating',
        'analyst_rating',
        'risk_rating',
        'performance_rating',
        'investment_strategy',
        'investment_managers',
        'dividend_frequency',
        'fund_benchmark',
        'morningstar_benchmark',
        'equity_style',
        'equity_style_score',
        'equity_size',
        'equity_size_score',
        'price_prospective_earnings',
        'price_book_ratio',
        'price_sales_ratio',
        'price_cash_flow_ratio',
        'dividend_yield_factor',
        'long_term_projected_earnings_growth',
        'historical_earnings_growth',
        'sales_growth',
        'cash_flow_growth',
        'book_value_growth',
        'roa',
        'roe',
        'roic',
        'bond_interest_rate_sensitivity',
        'bond_credit_quality',
        'average_coupon_rate',
        'average_credit_quality',
        'modified_duration',
        'effective_maturity',
        'asset_stock',
        'asset_bond',
        'asset_cash',
        'asset_other',
        'country_exposure',
        'sector_basic_materials',
        'sector_consumer_cyclical',
        'sector_financial_services',
        'sector_real_estate',
        'sector_consumer_defensive',
        'sector_healthcare',
        'sector_utilities',
        'sector_communication_services',
        'sector_energy',
        'sector_industrials',
        'sector_technology',
        'market_cap_giant',
        'market_cap_large',
        'market_cap_medium',
        'market_cap_small',
        'market_cap_micro',
        'credit_aaa',
        'credit_aa',
        'credit_a',
        'credit_bbb',
        'credit_bb',
        'credit_b',
        'credit_below_b',
        'credit_not_rated',
        'holdings_n_stock',
        'holdings_n_bonds',
        'ongoing_cost',
        'management_fees',
        'environmental_score',
        'social_score',
        'governance_score',
        'sustainability_score',
        'sustainability_rank',
        'sustainability_percentage_rank',
        'involvement_abortive_contraceptive',
        'involvement_alcohol',
        'involvement_animal_testing',
        'involvement_controversial_weapons',
        'involvement_gambling',
        'involvement_gmo',
        'involvement_military_contracting',
        'involvement_nuclear',
        'involvement_palm_oil',
        'involvement_pesticides',
        'involvement_small_arms',
        'involvement_thermal_coal',
        'involvement_tobacco',
        'latest_nav_date',
        'nav_per_share_currency',
        'nav_per_share',
        'shareclass_size_currency',
        'shareclass_size',
        'fund_size_currency',
        'fund_size',
        'top5_regions',
        'top5_holdings',
        'fund_trailing_return_ytd',
        'fund_trailing_return_3years',
        'fund_trailing_return_5years',
        'fund_trailing_return_10years',
        'fund_return_2019',
        'fund_return_2018',
        'fund_return_2017',
        'fund_return_2016',
        'fund_return_2015',
        'fund_return_2020_q3',
        'fund_return_2020_q2',
        'fund_return_2020_q1',
        'fund_return_2019_q4',
        'fund_return_2019_q3',
        'fund_return_2019_q2',
        'fund_return_2019_q1',
        'fund_return_2018_q4',
        'fund_return_2018_q3',
        'fund_return_2018_q2',
        'fund_return_2018_q1',
        'fund_return_2017_q4',
        'fund_return_2017_q3',
        'fund_return_2017_q2',
        'fund_return_2017_q1',
        'fund_return_2016_q4',
        'fund_return_2016_q3',
        'fund_return_2016_q2',
        'fund_return_2016_q1',
        'fund_return_2015_q4',
        'fund_return_2015_q3',
        'fund_return_2015_q2',
        'fund_return_2015_q1',
        'quarters_up',
        'quarters_down']
    
    return dict(zip(keys,[(ar.encode('utf-8')).decode('utf-8') for ar in arr]))

In [23]:
rdd.flatMap(lambda x: json.loads(x)['text'].split()[:3]).take(9)

['April', 'April', 'is', 'August', 'August', '(Aug.)', 'Art', 'Art', 'is']

In [24]:
rdd.filter(lambda x: json.loads(x)['text'][0] == 'B').take(1)

['{"title": "Black pudding", "text": "Black pudding\\n\\nBlack pudding is an English name for zwarte pudding. It is food made by cooking down the blood of any mammal (usually pigs or cattle) with meat, fat or filler until it is thick enough to congeal (become firm or solid) when cooled.\\n\\nIn Great Britain, blood sausage is called \\"Black Pudding\\". The ingredients include pig\'s blood, suet, bread, barley and oatmeal. The most common kind of German \\"blutwurst\\" is made from fatty pork meat, beef blood and filler such as barley. Though already cooked and \\"ready to eat\\" it is usually served warm.\\n\\nOther kinds of blood sausage include \\"boudin noir\\" (France), \\"boudin rouge\\" (Creole and Cajun) and \\"morcilla\\" (Spain).\\n\\nA legend says that blood sausage was invented in a bet between two Bavarian butchers drunk on the alcoholic drink absinthe during the 14th century. Homer\'s \\"Odyssey\\" from Ancient Greece says that \\"As when a man besides a great fire has fi

In [25]:
(
    rdd
    .map(lambda x: json.loads(x)['text'].split())
    .takeOrdered(1, lambda x: len(x))
)

[['Garmisch-Partenkirchen',
  'Olympics',
  'Garmisch-Partenkirchen',
  'Olympics',
  'can',
  'mean:',
  '<templatestyles',
  'src="Dmbox/styles.css"',
  '/>']]

In [159]:
import unicodecsv
(
    rdd.map(lambda x:  str_to_dict(re.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)",x)))
    .filter(lambda x: x['ticker'] != 'ticker').cache()
    
    #.groupBy(lambda x: ((x['fund_name']).encode('utf-8')).decode('utf-8') )
    #.collect()
    #.takeOrdered(1, lambda x: ('sber' in str((x['top5_holdings']))))
    .filter(lambda x: ('russia' in str.lower(str(str(x['fund_name'])))) )
    #.analyst_rating
    .count()
    #.take(3)
)


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 118.0 failed 1 times, most recent failure: Lost task 0.0 in stage 118.0 (TID 138, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1004, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1004, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-159-08470dde3508>", line 9, in <lambda>
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2013' in position 16: ordinal not in range(128)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1004, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1004, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-159-08470dde3508>", line 9, in <lambda>
UnicodeEncodeError: 'ascii' codec can't encode character u'\u2013' in position 16: ordinal not in range(128)

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


# Эмулируем MapReduce

In [26]:
rdd = sc.parallelize(["this is text", "some more text"])

In [27]:
(
    rdd
    .flatMap(lambda x: [(w, 1) for w in x.split()])
    .collect()
)

[('this', 1), ('is', 1), ('text', 1), ('some', 1), ('more', 1), ('text', 1)]

In [28]:
(
    rdd
    .flatMap(lambda x: [(w, 1) for w in x.split()])
    .groupByKey()
    .collect()
)

[('this', <pyspark.resultiterable.ResultIterable at 0x7f1119be77b8>),
 ('is', <pyspark.resultiterable.ResultIterable at 0x7f1119be7978>),
 ('more', <pyspark.resultiterable.ResultIterable at 0x7f1119be7860>),
 ('text', <pyspark.resultiterable.ResultIterable at 0x7f1119be74e0>),
 ('some', <pyspark.resultiterable.ResultIterable at 0x7f1119be7048>)]

In [29]:
(
    rdd
    .flatMap(lambda x: [(w, 1) for w in x.split()])
    .groupByKey()
    .map(lambda x: (x[0], list(x[1])))
    .collect()
)

[('this', [1]), ('is', [1]), ('more', [1]), ('text', [1, 1]), ('some', [1])]

In [30]:
(
    rdd
    .flatMap(lambda x: [(w, 1) for w in x.split()])
    .groupByKey()
    .map(lambda x: (x[0], len(x[1])))
    .collect()
)

[('this', 1), ('is', 1), ('more', 1), ('text', 2), ('some', 1)]

In [31]:
(
    rdd
    .flatMap(lambda x: [(w, 1) for w in x.split()])
    .reduceByKey(lambda a, b: a + b)
    .collect()
)

[('this', 1), ('is', 1), ('more', 1), ('text', 2), ('some', 1)]

# Решаем WordCount

In [32]:
import sys
import re
import string

def mapper(line):
    text = json.loads(line)['text']
    text = re.sub(f'[^{re.escape(string.printable)}]', ' ', text)  # not printable to space
    words = text.lower().split()
    return [(word, 1) for word in words]

In [33]:
mapper("{\"text\": \"This  is  text\"}")

[('this', 1), ('is', 1), ('text', 1)]

In [34]:
%%time
(
    sc.textFile("hdfs:///wiki/sample.jsonl")
    .flatMap(mapper)
    .reduceByKey(lambda a, b: a + b)
    .takeOrdered(10, lambda x: -x[1])
)

CPU times: user 20.1 ms, sys: 0 ns, total: 20.1 ms
Wall time: 1.41 s


[('the', 34538),
 ('of', 15944),
 ('and', 12966),
 ('in', 12448),
 ('a', 11322),
 ('is', 10870),
 ('to', 9469),
 ('are', 5598),
 ('it', 4330),
 ('that', 4057)]

# Ловим ошибку в коде

In [35]:
import sys
import re
import string

def mapper(line):
    text = json.loads(line)['text123']  # error: missing key!
    text = re.sub(f'[^{re.escape(string.printable)}]', ' ', text)  # not printable to space
    words = text.lower().split()
    return [(word, 1) for word in words]

In [None]:
%%time
(
    sc.textFile("hdfs:///wiki/sample.jsonl")
    .flatMap(mapper)
    .reduceByKey(lambda a, b: a + b)
    .takeOrdered(10, lambda x: -x[1])
)

# Останавливаем Spark (и YARN приложение)

In [37]:
sc.stop()