# Codingan utk membaca data Streaming dari Kafka menggunakan Pyspark

In [2]:
# Import library utk kita membuat direktori tmpt kita menyimpan data
import os
# Disini library os jg dpt kita manfaatkan utk mendownload
# file jar yg kita perlukan pada saat kita menjalankan Pyspark
# Apabila file jar belum tersedia, maka akan didownload otomatis
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
# Kemudian, utk mengakses masalah I / O file, kita membutuhkan library io
import io
# Library utk men-load maupun men-dump data berformat JSON
import json
# Library utk meng-import fungsi delay
import time

Selanjutnya, kita akan meng-import library dari Pyspark yg kita perlukan. Perlu diperhatikan bahwa Apache Spark telah terinstall di komputer yang ingin kita jalankan. Setelah itu, pastikan pula bahwa library pyspark telah terinstall di versi Python yg kita gunakan. Install pyspark dgn cara mengetikkan "pip install pyspark" pada command prompt

In [3]:
#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
#    Kafka
from pyspark.streaming.kafka import KafkaUtils  
#    json parsing

Selanjutnya, kita perlu mengimpor library eksternal Python yang bernama Sastrawi. Sastrawi berfungsi utk men-stemming atau mendapatkan kata dasar dari suatu kata. Contohnya : menyentuh -> sentuh, membunuh -> bunuh, dll

In [4]:
# Inisialisasi fungsi Stemmer bahasa Indonesia
# Stemmer untuk membuang semua imbuhan dan mendapatkan kata dasarnya
from Sastrawi.Stemmer.StemmerFactory import StemmerFactory
factory = StemmerFactory()
stemmer = factory.create_stemmer()

Selanjutnya, kita perlu mengimpor dictionary lookup. Dictionary lookup ini berfungsi utk mengubah bentuk kata-kata alay menjadi kata bentuk formalnya. Contoh : gw -> saya, egp -> memang saya pikirin, sotoy -> sok tahu. Untuk meng-enrich kata-kata yang baru, kalian dapat menambahkannya sendiri di file formalizationDict.txt. (Perlu diperhatikan bahwa antara kata alay dan kata perbaikannya dipisahkan menggunakan tab)

In [5]:
lookup_dict = {}
# Dictionary utk lemmatize
with io.open("formalizationDict.txt", 'r') as f:
    for line in f:
        items = line.split()
        key, values = items[0], items[1:]
        lookup_dict[key] = ' '.join(values)

# Fungsi untuk menimpa kata-kata yang salah / alay dengan kata
# yang terdapat pada formalizationDict
# Contoh : gpp => tidak apa-apa
#          egp => emang saya pikirin
        
def _lookup_words(input_text):
    words = input_text.split() 
    new_words = []
    new_text = ""
    for word in words:
        if word.lower() in lookup_dict:
            word = lookup_dict[word.lower()]
        new_words.append(word)
        new_text = " ".join(new_words) 
    return new_text

Selanjutnya, kita perlu membuat fungsi standar untuk cleansing text dari tanda baca dan symbol dari text. Kita dapat menggunakan fungsi library re (regular expression).

In [7]:
# Removes punctuation, parentheses, question marks, etc., and leaves only alphanumeric characters
import re
strip_special_chars = re.compile("[^A-Za-z ]+")

def cleanSentences(string):
    string = string.lower().replace("<br />", " ")
    return re.sub(strip_special_chars, " ", string.lower())