Uzitecne python funkce

In [9]:
my_string = "Hello cruel world"
my_list = my_string.split()
print 'Delka stringu je', len(my_string)
print 'String obsahuje slova', my_string.split()
print 'Lowercase ', my_string.lower()
print 'Delka pole je', len(my_list)
print 'Prvni prvek v poli je', my_list[0]
print 'Posledni prvek v poli je', my_list[-1]

Delka stringu je 17
String obsahuje slova ['Hello', 'cruel', 'world']
Lowercase  hello cruel world
Delka pole je 3
Prvni prvek v poli je Hello
Posledni prvek v poli je world


# Spustit interaktivni shell Spark-pythonu

`pyspark --master yarn --num-executors 4`

### word count nebo Hello hadoop

In [None]:
def split_string(verse):    
    return verse.split(' ')

lines = sc.textFile("/user/pascepet/bible.txt")
words = lines.flatMap(split_string)
pairs = words.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

# samostatny ukol c.1

### Ukol 

Spocitat pocet unikatnich slov v kazdem verse a najit vers s nejvetsim poctem slov

### Data

`hdfs:///user/pascepet/bible.txt`

### Ocekavany vystup

| verse_id | pocet_slov |
|:---------|:-----------|
|          |            |

In [None]:
lines = sc.textFile("/user/pascepet/bible.txt")

def find_verse_len(verse):
    title, text = verse.split('\t')
    words = text.lower().split()
    unique_words = set(words)
    
    return title, len(unique_words)

verse_lens = lines.map(find_verse_len)

verse_lens.sortBy(lambda keyval: keyval[1], ascending=False).take(1)

### Bonus odfiltrovat stop-slova pomoci [Spark broadcast](https://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables)

In [None]:
# nacist stop slova do mnoziny muzete pomoci prikazu
with open('/tmp/pascepet/stopwords.txt') as stopwords_file:
    stopwords = set([x.replace('\n', '').replace('\r', '') for x in stopwords_file.readlines()])

In [None]:
stopwords_bc = sc.broadcast(stopwords)

lines = sc.textFile("/user/pascepet/bible.txt")

def find_verse_len_nostop(verse):
    title, text = verse.split('\t')
    words = text.lower().split()
    unique_words = set(words)
    unique_no_stop = filter(lambda word: word not in stopwords_bc.value, unique_words)
    return title, len(unique_no_stop)

verse_lens = lines.map(find_verse_len_nostop)

verse_lens.sortBy(lambda keyval: keyval[1], ascending=False).take(1)

# samostatny ukol c.2

### Ukol 
Vypsat top 10 zakazniku z nejvyssi utratou/nejvetsim zustatkem za dane obdobi

### Data 

`hdfs:///user/pascepet/tranzakce/`

### Vstupni data

`csv soubory, oddelovac ','`

| id zakaznika | id protiuctu | castka | je odchozi |
|:-------------|:-------------|:-------|:-----------|
| 1202         | 2456         | 564    | True       |


### Ocekavany vystup

| id zakaznika | aktualni zustatek | utrata | 
|:-------------|:------------------|:-------|
| 1202         | -200              | 450    |



In [None]:
transactions_rdd = sc.textFile("/user/pascepet/tranzakce/")

def parse_transaction(row):
    parts = row.split(',')
    
    transaction = {
        'cust_id': parts[0],
        'b_party_id': parts[1],
        'amount': int(parts[2]),
        'is_spending': parts[3] == 'True'
    }
    
    return transaction

def map_transaction(t):
    
    record = {
        'balance': -t['amount'] if t['is_spending'] else t['amount'],
        'spent': t['amount'] if t['is_spending'] else 0
    }
    
    return t['cust_id'], record

def reduce_records(x, y):    
    return {
        'balance': x['balance'] + y['balance'],
        'spent': x['spent'] + y['spent']
    }

transactions = transactions_rdd.map(parse_transaction)
cust_records = transactions.map(map_transaction)
cust_aggregates = cust_records.reduceByKey(reduce_records)

# cust_aggregates.cache()

cust_aggregates.sortBy(lambda record: record[1]['balance'], ascending=False).take(10)
cust_aggregates.sortBy(lambda record: record[1]['spent'], ascending=False).take(10)

# samostatny ukol c.3

### Ukol
Pro kazdeho zakaznika spocitat podil odchozich volani a pomer poctu kontakty ku poctu volani

### Data

`hdfs:///user/pascepet/spark_sql/cdr_sample/`

### Vstup

| typ rekordu | timestamp | duration | frommsisdn_prefix | frommsisdn | tomsisdn_prefix | tomsisdn |
|:------------|:----------|:---------|:------------------|:-----------|:----------------|:---------|

### Vystup

| frommsisdn | moc_% | number_ratio |
|:-----------|:------|:-------------|


In [None]:
# data ulozena v parquet formatu, pro nacitani parquet souboru pouzijte 

rdd = sqlContext.read.parquet('/user/pascepet/spark_sql/cdr_sample/').rdd

calls = rdd.filter(lambda cdr: cdr['record_type'] != u'callForwarding')

def map_moc_call(cdr):
    cust = cdr['frommsisdn']
    data = {
        'moc': 1 if cdr['record_type'] == 'mSOriginating' else 0,
        'total': 1
    }
    return cust, data

cust_calls = calls.map(map_moc_call)

def reduce_cust_calls(x, y):    
    return {
        'moc': x['moc'] + y['moc'],
        'total': x['total'] + y['total']
    }

cust_agg = cust_calls.reduceByKey(reduce_cust_calls)

def map_ratio(record):
    cust_id = record[0]
    data = record[1]
    
    moc_ratio = data['moc'] / float(data['total'])
    
    return cust_id, moc_ratio

cust_agg.map(map_ratio)