CREATE DATA PROCESSING


In [None]:
!pip install apache-beam

In [None]:
# Add configuration setting

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

beam_options = PipelineOptions()    #configuration Beam Options to be able to install custom profiles for the application

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')


In [None]:
!mkdir -p data
!gsutil cp gs://apachebeamdt/SMSSpamCollection data/
!head data/SMSSpamCollection

Copying gs://apachebeamdt/SMSSpamCollection...
/ [0 files][    0.0 B/466.7 KiB]                                                / [1 files][466.7 KiB/466.7 KiB]                                                
Operation completed over 1 objects/466.7 KiB.                                    
ham	Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...
ham	Ok lar... Joking wif u oni...
spam	Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's
ham	U dun say so early hor... U c already then say...
ham	Nah I don't think he goes to usf, he lives around here though
spam	FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv
ham	Even my brother is not like to speak with me. They treat me like aids patent.
ham	As per your request 'Melle Melle (Oru 

In [None]:
# Reading from Text file

inputs_pattern = 'data/SMSSpamCollection'

pipeline = beam.Pipeline()

outputs = (
    pipeline
      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)
)

In [None]:
# Writing text file
inputs_pattern = 'data/SMSSpamCollection'

pipeline = beam.Pipeline()

outputs = (
    pipeline
      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)
      | 'Write results' >> beam.io.WriteToText("ansoutput1", file_name_suffix = ".txt")
      | 'Print the text file name' >> beam.Map(print)
)

pipeline.run()

! head ansoutput1*.txt

ansoutput1-00000-of-00001.txt
ham	Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...
ham	Ok lar... Joking wif u oni...
spam	Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's
ham	U dun say so early hor... U c already then say...
ham	Nah I don't think he goes to usf, he lives around here though
spam	FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv
ham	Even my brother is not like to speak with me. They treat me like aids patent.
ham	As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune
spam	WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL34

**PTransforms**: The text messages that could be classified as spam or non-spam (ham) => Need to use PTransforms

In [None]:
# Map: 1 way to distinguish spam SMS or ham SMS

import re

inputs_pattern = 'data/SMSSpamCollection'

pipeline = beam.Pipeline()

outputs = (
    pipeline
      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)
      | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
      | 'Write results' >> beam.io.WriteToText("ansoutput2", file_name_suffix = ".txt")
      | 'Print the text file name' >> beam.Map(print)
)

pipeline.run()

! head ansoutput2*.txt

ansoutput2-00000-of-00001.txt
['ham', 'Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...']
['ham', 'Ok lar... Joking wif u oni...']
['spam', "Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's"]
['ham', 'U dun say so early hor... U c already then say...']
['ham', "Nah I don't think he goes to usf, he lives around here though"]
['spam', "FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv"]
['ham', 'Even my brother is not like to speak with me. They treat me like aids patent.']
['ham', "As per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune"]
['spam', 'WINNER!! As a valued network customer you have been selected to receivea 

In [None]:
# Filter: focus on counting word spam label

import re

inputs_pattern = 'data/SMSSpamCollection'

pipeline = beam.Pipeline()

outputs = (
    pipeline
      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)
      | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == "spam")
      | 'Write results' >> beam.io.WriteToText("ansoutput3", file_name_suffix = ".txt")
      | 'Print the text file name' >> beam.Map(print)
)

pipeline.run()

! head ansoutput3*.txt

ansoutput3-00000-of-00001.txt
['spam', "Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's"]
['spam', "FreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, £1.50 to rcv"]
['spam', 'WINNER!! As a valued network customer you have been selected to receivea £900 prize reward! To claim call 09061701461. Claim code KL341. Valid 12 hours only.']
['spam', 'Had your mobile 11 months or more? U R entitled to Update to the latest colour mobiles with camera for Free! Call The Mobile Update Co FREE on 08002986030']
['spam', 'SIX chances to win CASH! From 100 to 20,000 pounds txt> CSH11 and send to 87575. Cost 150p/day, 6days, 16+ TsandCs apply Reply HL 4 info']
['spam', 'URGENT! You have won a 1 week FREE membership in our £100,000 Prize Jackpot! Txt the word: CLAIM to No: 81010 T&C www.dbuk.net LCCLTD POBOX 4403LD

In [None]:
# FlatMap: We know SMS lable spam, change element 
# FlatMap: function transform single input to iterable output.

import re

inputs_pattern = 'data/SMSSpamCollection'

pipeline = beam.Pipeline()

outputs = (
    pipeline
      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)
      | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == "spam")
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
      | 'Write results' >> beam.io.WriteToText("ansoutput3", file_name_suffix = ".txt")
      | 'Print the text file name' >> beam.Map(print)
)

pipeline.run()

! head ansoutput3*.txt

ansoutput3-00000-of-00001.txt
Free
entry
in
a
wkly
comp
to
win
FA
Cup


In [None]:
# Combine 
inputs_pattern = 'data/SMSSpamCollection'

pipeline = beam.Pipeline()

outputs = (
    pipeline
      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)
      | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
      | 'Keep only spam' >> beam.Filter(lambda line: line[0] == "spam")
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      | 'Group and sum' >> beam.CombinePerKey(sum)
      | 'Write results' >> beam.io.WriteToText("ansoutput4", file_name_suffix = ".txt")
      | 'Print the text file name' >> beam.Map(print)
)

pipeline.run()

! head ansoutput4*.txt

ansoutput4-00000-of-00001.txt
('Free', 43)
('entry', 25)
('in', 77)
('a', 367)
('wkly', 10)
('comp', 10)
('to', 611)
('win', 38)
('FA', 4)
('Cup', 3)


In [None]:
# Full code Spam Ham Apache Beam

import apache_beam as beam
import re

inputs_pattern = 'data/SMSSpamCollection'
outputs_prefix_ham = 'outputs/fullcodeham'
outputs_prefix_spam = 'outputs/fullcodespam'

# Ham Word Count
with beam.Pipeline() as pipeline:
     ham = (
      pipeline
      | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)
      | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
      | 'Keep only ham' >> beam.Filter(lambda line: line[0] == "ham")
      | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
      | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
      | 'Group and sum' >> beam.CombinePerKey(sum)
      | 'Format results' >> beam.Map(lambda word_c: str(word_c))
      | 'Write results' >> beam.io.WriteToText(outputs_prefix_ham, file_name_suffix = ".txt")
    )

# Spam Word Count
with beam.Pipeline() as pipeline1:
  spam = (
    pipeline1
    | 'Take in Dataset' >> beam.io.ReadFromText(inputs_pattern)
    | 'Separate to list' >> beam.Map(lambda line: line.split("\t"))
    | 'Filter out only spam' >> beam.Filter(lambda line: line[0] == "spam")
    | 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line[1]))
    | 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
    | 'Group and sum' >> beam.CombinePerKey(sum)
    | 'Format results' >> beam.Map(lambda word_c: str(word_c))
    | 'Write results' >> beam.io.WriteToText(outputs_prefix_spam, file_name_suffix = ".txt")
    )

print('Ham Word Count Head')
! head outputs/fullcodeham*.txt

print('Spam Word Count Head')
! head outputs/fullcodespam*.txt

Ham Word Count Head
('Go', 10)
('until', 21)
('jurong', 1)
('point', 12)
('crazy', 9)
('Available', 1)
('only', 118)
('in', 770)
('bugis', 6)
('n', 139)
Spam Word Count Head
('Free', 43)
('entry', 25)
('in', 77)
('a', 367)
('wkly', 10)
('comp', 10)
('to', 611)
('win', 38)
('FA', 4)
('Cup', 3)


In [None]:
# Visualize Beam Pipeline

import apache_beam.runners.interactive.interactive_beam as ib
ib.show_graph(pipeline)