In [1]:
from __future__ import division
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import re


In [None]:
try:
    sc = SparkContext(conf=SparkConf().setAppName("RiMBD_lab").setMaster("local").set("spark.cores.max", "4"))
except:
    pass

In [4]:
try:
    spark = SparkSession.builder.master("local").appName("RiMBD_lab").config("spark.some.config.option", "some-value").getOrCreate()
except:
    pass


In [5]:
spark

<pyspark.sql.session.SparkSession at 0x7f9a405ce810>

In [3]:
sc

<pyspark.context.SparkContext at 0x7f9a3815ebd0>

In [6]:
#Równoległe wywołanie polecenia range
#wyjście DataFrame 

firstDataFrame = spark.range(1000000)
#display(firstDataFrame)
firstDataFrame.show(5)

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+
only showing top 5 rows



In [7]:
# Przykładowa transformacja:
# mnożenie kol ID przez 2
secondDataFrame = firstDataFrame.selectExpr("(id * 2) as value")


In [8]:
secondDataFrame.show(n=5)
secondDataFrame.take(5)

+-----+
|value|
+-----+
|    0|
|    2|
|    4|
|    6|
|    8|
+-----+
only showing top 5 rows



[Row(value=0), Row(value=2), Row(value=4), Row(value=6), Row(value=8)]

In [9]:
import os
notebook_path = os.path.abspath("DemoNotebook.ipynb")
print (notebook_path)

/home/jovyan/DemoNotebook.ipynb


Kilka akcji, które możemy wykonać na RDD (zwracają wynik):

In [13]:
#Tworzenie nowego RDD z pliku tekstowego bbc_news.txt:    
bbc_news = sc.textFile("/home/jovyan/bbc_news.txt")

In [11]:
# Number of items in this RDD
bbc_news.count()  

849

In [14]:
# First item in this RDD
bbc_news.first()  

u''

In [33]:
#usuwanie pustych linii
bbc_news = bbc_news.filter(lambda x: x != "")
bbc_news.first() 

u"A majority of Minneapolis City Council has pledged to dismantle the local police department, a significant move amid nationwide protests sparked by George Floyd's death last month."

In [34]:
# Number of items in this RDD
bbc_news.count()  

821

Oraz transformacje (zwracają wskaźnik do nowego RDD):

In [15]:
#Liczba wystąpień słowa 'black'
linesWithBlack = bbc_news.filter(lambda line: "black" in line)
print('To jest nowe RDD: ')
print (linesWithBlack)
print('Lines with \'black\': '+ str(linesWithBlack.count()))

#Liczba wystąpień słowa 'police'
linesWithPolice = bbc_news.filter(lambda line: "police" in line)
print('Lines with \'police\': '+ str(linesWithPolice.count()))


To jest nowe RDD: 
PythonRDD[19] at RDD at PythonRDD.scala:48
Lines with 'black': 121
Lines with 'police': 111


Klasyczne zliczanie wystąpień słów:

In [35]:

wordCounts = bbc_news.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

#wordCounts.collect() #unordered


wordCounts.takeOrdered(15, key=lambda(k, v): -v)

[(u'the', 1065),
 (u'to', 606),
 (u'a', 549),
 (u'of', 473),
 (u'and', 448),
 (u'in', 421),
 (u'that', 264),
 (u'was', 220),
 (u'for', 192),
 (u'is', 191),
 (u'on', 175),
 (u'I', 175),
 (u'have', 175),
 (u'been', 138),
 (u'black', 133)]

In [17]:
#Download stops words
!wget http://tacit.usc.edu/resources/stopwords_eng.txt

--2020-06-08 07:01:01--  http://tacit.usc.edu/resources/stopwords_eng.txt
Resolving tacit.usc.edu (tacit.usc.edu)... 68.181.64.53
Connecting to tacit.usc.edu (tacit.usc.edu)|68.181.64.53|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 669 [text/plain]
Saving to: ‘stopwords_eng.txt’


2020-06-08 07:01:01 (51.1 MB/s) - ‘stopwords_eng.txt’ saved [669/669]



In [19]:
sw = sc.textFile('/home/jovyan/stopwords_eng.txt')
sw.take(10)

[u'a',
 u'able',
 u'about',
 u'across',
 u'after',
 u'all',
 u'almost',
 u'also',
 u'am',
 u'among']

In [36]:
sw_collected= sw.collect()
no_stop_words_wordCounts = (wordCounts.filter(lambda (k,v): k not in sw_collected))

no_stop_words_wordCounts.takeOrdered(15, key=lambda(k, v): -v)


[(u'I', 175),
 (u'black', 133),
 (u'police', 117),
 (u'The', 92),
 (u'people', 81),
 (u'video', 78),
 (u'George', 77),
 (u'-', 75),
 (u'But', 64),
 (u'Mr', 54),
 (u'white', 54),
 (u'images', 54),
 (u'"I', 53),
 (u'said.', 51),
 (u'told', 48)]

In [21]:
wordCounts.takeOrdered(15, key=lambda(k, v): -v)

[(u'the', 1065),
 (u'to', 606),
 (u'a', 549),
 (u'of', 473),
 (u'and', 448),
 (u'in', 421),
 (u'that', 264),
 (u'was', 220),
 (u'for', 192),
 (u'is', 191),
 (u'on', 175),
 (u'I', 175),
 (u'have', 175),
 (u'been', 138),
 (u'black', 133)]

## Cache'owanie

In [37]:
bbc_news.cache()  

PythonRDD[58] at RDD at PythonRDD.scala:48

In [38]:
bbc_news.count()  

821

In [39]:
bbc_news.count()  

821

In [25]:
# dane:
df_list = [('Polska', 'Warszawa', 1),
           ('Polska', 'Krakow', 2),
           ('Niemcy', 'Berlin', 1),
           ('Niemcy', 'Hamburg', 3),
           ('Czechy', 'Praga', 1),
           ('Rosja', 'Moskwa', 1),
           ('Francja', 'Beauvais', 7)]
 
# budowanie data frame 
df = spark.createDataFrame(df_list,['kraj', 'miasto', 'liczba'])
 
# wyświetlenie
df.show()

+-------+--------+------+
|   kraj|  miasto|liczba|
+-------+--------+------+
| Polska|Warszawa|     1|
| Polska|  Krakow|     2|
| Niemcy|  Berlin|     1|
| Niemcy| Hamburg|     3|
| Czechy|   Praga|     1|
|  Rosja|  Moskwa|     1|
|Francja|Beauvais|     7|
+-------+--------+------+



## SQL

In [26]:
# tworzenie widoku SQL z DataFrame
df_sql = df.createOrReplaceTempView("dane")
 
# tworzenie zapytań SQL
sql_results = spark.sql("SELECT * FROM dane WHERE kraj='Polska'")
 
sql_results

DataFrame[kraj: string, miasto: string, liczba: bigint]

In [27]:
sql_results.show()

+------+--------+------+
|  kraj|  miasto|liczba|
+------+--------+------+
|Polska|Warszawa|     1|
|Polska|  Krakow|     2|
+------+--------+------+



In [None]:
def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        return words
    except ValueError as e:
        return []

In [29]:
#Nowe RDD zawierające artykuły z wikipedii:    

wiki = sc.textFile("/data/wiki/en_articles_part/articles-part", 16).map(parse_article)
result = wiki.take(1)[0]


In [30]:
for word in result[:50]:
    print word

Anarchism
Anarchism
is
often
defined
as
a
political
philosophy
which
holds
the
state
to
be
undesirable
unnecessary
or
harmful
The
following
sources
cite
anarchism
as
a
political
philosophy
Slevin
Carl
Anarchism
The
Concise
Oxford
Dictionary
of
Politics
Ed
Iain
McLean
and
Alistair
McMillan
Oxford
University
Press
2003
However
others
argue


In [51]:
def pairs_starting_from_word(words, first_word='word'):
    pairs = []
    
    for i, word in enumerate(words[:-1]):
        if (word == first_word):
            pair = '{} — {}'.format(word, words[i+1])
            cnt = 1
            pairs.append((pair, cnt))
        else:
            continue
    return pairs



In [52]:
# lowercase all words
wiki_lower = wiki.map(lambda words: [x.lower() for x in words])

In [64]:
# find pairs starting from defined word
wiki_pairs = wiki_lower.flatMap(lambda x: pairs_starting_from_word(x, 'gazeta'))#

In [69]:
# filtering empty elements
wiki_pairs = wiki_pairs.filter(lambda x: x != [])


In [70]:
# aggregate counters
wiki_red = wiki_pairs.reduceByKey(lambda a, b: a + b, numPartitions=16)

In [68]:
result = wiki_red.collect()
for pair, cnt in result:
    print '{}\t{}'.format(pair, cnt)

gazeta — have	1
gazeta — do	1
gazeta — without	1
gazeta — the	1
gazeta — chojnowska	1


In [None]:
# sort values by key
wiki_red_sorted = wiki_red.sortByKey()

In [None]:
result = wiki_red_sorted.collect()
for pair, cnt in result:
    print '{}\t{}'.format(pair, cnt)