In [None]:
pip install google-cloud-pubsub

In [None]:
import os
import time
from google.cloud import pubsub_v1
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os
from apache_beam import window
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode, AfterCount, Repeatedly
from datetime import datetime

In [None]:
def encode_byte_string(element):
    element = str(element)
    return element.encode('utf-8')

def custom_timestamp(elements):
    unix_timestamp = elements[7]
    return beam.window.TimestampedValue(elements, int(unix_timestamp))

def calculateProfit(elements):
    buy_rate = elements[5]
    sell_price = elements[6]
    products_count = int(elements[4])
    profit = (int(sell_price) - int(buy_rate)) * products_count
    elements.append(str(profit))
    return elements

------

# FixedWindow

In [None]:
p = beam.Pipeline()

pubsub_data= (
    p 
    | 'Read from pub sub' >> beam.io.ReadFromText('store_sales.csv')

    | 'Remove extra chars' >> beam.Map(lambda data: (data.rstrip().lstrip()))          
    
    | 'Split Row' >> beam.Map(lambda row : row.split(','))                             
    
    | 'Filter By Country' >> beam.Filter(lambda elements : (elements[1] == "Mumbai" or elements[1] == "Bangalore"))
    
    | 'Create Profit Column' >> beam.Map(calculateProfit)                              
    
    | 'Apply custom timestamp' >> beam.Map(custom_timestamp) 
    
    | 'Form Key Value pair' >> beam.Map(lambda elements : (elements[0], int(elements[8])))  

    | 'Window' >> beam.WindowInto(window.FixedWindows(0.000001))
    
    | 'Sum values' >> beam.CombinePerKey(sum)

    | 'Write to pus sub' >> beam.io.WriteToText('output/pubsub')
)

result = p.run()
result.wait_until_finish()

In [None]:
!{'head -n 5 output/pubsub-00000-of-00001'}

------

# Sliding Window

In [None]:
p = beam.Pipeline()

pubsub_data= (
    p 
    | 'Read from pub sub' >> beam.io.ReadFromText('store_sales.csv')
    
    | 'Remove extra chars' >> beam.Map(lambda data: (data.rstrip().lstrip()))         
    
    | 'Split Row' >> beam.Map(lambda row : row.split(','))                            
    
    | 'Filter By Country' >> beam.Filter(lambda elements : (elements[1] == "Mumbai" or elements[1] == "Bangalore"))
    
    | 'Create Profit Column' >> beam.Map(calculateProfit)                              
    
    | 'Apply custom timestamp' >> beam.Map(custom_timestamp) 
    
    | 'Form Key Value pair' >> beam.Map(lambda elements : ((elements[0],elements[1]), int(elements[8])))  
    
    | 'Window' >> beam.WindowInto(window.SlidingWindows(0.0001,0.000001))
    
    | 'Sum values' >> beam.CombinePerKey(sum)

    | 'Write to pus sub' >> beam.io.WriteToText('output/pubsub')
)

result = p.run()
result.wait_until_finish()

In [None]:
!{'head -n 5 output/pubsub-00000-of-00001'}

------

# Session Windows

In [None]:
p = beam.Pipeline()

pubsub_data= (
    p 
    | 'Read from pub sub' >> beam.io.ReadFromText('store_sales.csv')

    | 'Remove extra chars' >> beam.Map(lambda data: (data.rstrip().lstrip()))          
    
    | 'Split Row' >> beam.Map(lambda row : row.split(','))                             
    
    | 'Filter By Country' >> beam.Filter(lambda elements : (elements[1] == "Mumbai" or elements[1] == "Bangalore"))
    
    | 'Create Profit Column' >> beam.Map(calculateProfit)                              
    
    | 'Apply custom timestamp' >> beam.Map(custom_timestamp) 
    
    | 'Form Key Value pair' >> beam.Map(lambda elements : ((elements[0],elements[1]), int(elements[8])))  
    
    | 'Window' >> beam.WindowInto(window.Sessions(0.001))
    
    | 'Sum values' >> beam.CombinePerKey(sum)

    | 'Write to pus sub' >> beam.io.WriteToText('output/pubsub')
)

result = p.run()
result.wait_until_finish()

In [None]:
!{'head -n 5 output/pubsub-00000-of-00001'}

------

# Global Windows

In [None]:
def player_pair(element_list):
    return element_list[1],1
  
def score_pair(element_list):
    return ((element_list[3],element_list[4]),1)

def custom_timestamp(elements):
    unix_timestamp = elements[16].rstrip().lstrip()
    return beam.window.TimestampedValue(elements, int(unix_timestamp))

In [None]:
p = beam.Pipeline()

pubsub_data = (
                 p 
                | 'Read from pub sub' >> beam.io.ReadFromText('mobile_game.txt')
                | 'Parse data' >> beam.Map(lambda element: element.split(','))
                | 'Apply custom timestamp' >> beam.Map(custom_timestamp)
              )

player_score = (
                pubsub_data 
                | 'Form k,v pair of (player_id, 1)' >> beam.Map( player_pair )
                | 'Window for player' >> beam.WindowInto(window.GlobalWindows(), trigger=Repeatedly(AfterCount(1)), accumulation_mode=AccumulationMode.ACCUMULATING) 
                | 'Group players and their score' >> beam.CombinePerKey(sum)
              )

team_score = (
                pubsub_data 
                | 'Form k,v pair of (team_score, 1)' >> beam.Map( score_pair )
                | 'Window for team' >> beam.WindowInto(window.GlobalWindows(), trigger=Repeatedly(AfterCount(1)), accumulation_mode=AccumulationMode.ACCUMULATING) 
                | 'Group teams and their score' >> beam.CombinePerKey(sum)
                | 'Write to pus sub' >> beam.io.WriteToText('output/pubsub')
              )

result = p.run()
result.wait_until_finish()

In [None]:
!{'head -n 5 output/pubsub-00000-of-00001'}

------

# Assignment

In [None]:
class PointFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, sum_count, input):                        
        (sum, count) = sum_count                                       
        return sum + input, count + 1                                

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)                            
        return sum(sums), sum(counts)                                

    def extract_output(self, sum_count):
        (sum, count) = sum_count                                   
        return sum / count if count else float('NaN')  

In [None]:
def calculate_battle_points(element_list):     
    total_points = 0
    game_id = element_list[0]
    player_id = element_list[1]                                        
    weapon = element_list[5]                                           

    my_weapon_ranking = element_list[6].rstrip().lstrip() 
    my_weapon_ranking = int(my_weapon_ranking) 
        
    opp_weapon_ranking = element_list[13].rstrip().lstrip()       
    opp_weapon_ranking = int(opp_weapon_ranking)
    
    my_map_location = element_list[7].rstrip().lstrip()               
    opp_map_location = element_list[14].rstrip().lstrip()             

    battle_time = element_list[15]
    battle_time = int(battle_time.rstrip().lstrip())                  

    if battle_time >= 10 and battle_time <= 20:
        total_points += 4  
    elif battle_time >=21 and battle_time <= 30:
        total_points += 3
    elif battle_time >=31 and battle_time <=40:
        total_points += 2
    elif battle_time > 40:
        total_points += 1

    diff = my_weapon_ranking - opp_weapon_ranking

    if diff >= 6:
        total_points += 3 

    elif diff >= 3:
        total_points += 2  
    else: 
        total_points += 1     

    if my_map_location != opp_map_location:                         
        total_points += 3         

    return game_id + ':' + player_id + ':' + weapon, total_points                    

In [None]:
def format_result(key_value_pair):
        name, points = key_value_pair
        name_list = name.split(':')
        game_id = name_list[0]
        player_id = name_list[1]
        weapon = ' '.join(name_list[2:])
        return  game_id + ',' + player_id + ', ' + weapon + ', ' + str(int(points*100)/100) + ' average battle points '

In [None]:
p = beam.Pipeline()

pubsub_data = (
                p 
                | 'Read from pub sub' >> beam.io.ReadFromText('mobile_game.txt')
                
                | 'Parse data' >> beam.Map(lambda element: element.split(','))
               
                | 'Calculate battle points' >> beam.Map(calculate_battle_points)        
             
                | 'Window for player' >> beam.WindowInto(window.Sessions(30))
                
                | 'Group by key' >> beam.CombinePerKey(PointFn())                    
                
                | 'Format results' >> beam.Map(format_result)    
                
                | 'Write to pus sub' >> beam.io.WriteToText('output/pubsub')
)


result = p.run()
result.wait_until_finish()

In [None]:
!{'head -n 5 output/pubsub-00000-of-00001'}