### Apache Beam Local 실행 구현  및  처리 함수 분석 (For Dataflow 실습) - Windows용

In [1]:
# apache-beam 설치
# !pip install apache-beam

In [2]:
! mkdir tmp
! mkdir javahelp
! copy IsPopular.java .\javahelp

        1개 파일이 복사되었습니다.


In [3]:
import apache_beam as beam
import sys

# 기존 출력 파일 삭제
import os
output_text_file = "tmp/output-00000-of-00001"
if (os.path.isfile(output_text_file)):
    os.remove(output_text_file)
    
def my_grep(line, term):
   if line.startswith(term):
      yield line

if __name__ == '__main__':
   p = beam.Pipeline(argv=sys.argv)
   input = 'javahelp/IsPopular.java'
   output_prefix = 'tmp/output'
   searchTerm = 'import'

   # find all lines that contain the searchTerm
   (p
      | 'GetJava' >> beam.io.ReadFromText(input)
      | 'Grep' >> beam.FlatMap(lambda line: my_grep(line, searchTerm) )
      | 'write' >> beam.io.WriteToText(output_prefix)
   )

   p.run().wait_until_finish()






In [4]:
# Window OS
!type tmp\output-00000-of-00001

import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.KV;


In [5]:
import apache_beam as beam
import sys

# 기존 출력 파일 삭제
import os
output_text_file = "tmp/output-00000-of-00001"
if (os.path.isfile(output_text_file)):
    os.remove(output_text_file)

def startsWith(line, term):
   if line.startswith(term):
      yield line

def splitPackageName(packageName):
   """e.g. given com.example.appname.library.widgetname
           returns com
	           com.example
                   com.example.appname
      etc.
   """
   result = []
   end = packageName.find('.')
   while end > 0:
      result.append(packageName[0:end])
      end = packageName.find('.', end+1)
   result.append(packageName)
   return result

def getPackages(line, keyword):
   start = line.find(keyword) + len(keyword)
   end = line.find(';', start)
   if start < end:
      packageName = line[start:end].strip()
      return splitPackageName(packageName)
   return []

def packageUse(line, keyword):
   packages = getPackages(line, keyword)
   for p in packages:
      yield (p, 1)
    
keyword = 'import'
input = 'javahelp/IsPopular.java'
output_prefix = 'tmp/output'

p = beam.Pipeline(argv=sys.argv)
# find most used packages
(p
  | 'GetJava' >> beam.io.ReadFromText(input)
  | 'GetImports' >> beam.FlatMap(lambda line: startsWith(line, keyword))
  | 'PackageUse' >> beam.FlatMap(lambda line: packageUse(line, keyword))
  | 'TotalUse' >> beam.CombinePerKey(sum)
  | 'Top_5' >> beam.transforms.combiners.Top.Of(5, key=lambda kv: kv[1])
  | 'write' >> beam.io.WriteToText(output_prefix)
)

p.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x2673d1a4e20>

In [6]:
# Window OS
!type tmp\output-00000-of-00001



[('org.apache.beam.sdk', 11), ('org.apache', 11), ('org.apache.beam', 11), ('org', 11), ('org.apache.beam.sdk.options', 4)]


### Generator

In [7]:
# Generator
lines= ['package com.google.cloud.training.dataanalyst.javahelp;',
        'import org.apache.beam.sdk.Pipeline;',
        'import org.apache.beam.sdk.io.TextIO;',
        'import org.apache.beam.sdk.options.PipelineOptions;',
        'import org.apache.beam.sdk.options.PipelineOptionsFactory;',
        'import org.apache.beam.sdk.transforms.DoFn;',
        'import org.apache.beam.sdk.transforms.ParDo;' ]

keyword ='import'
def startsWith(line, term):
    if line.startswith(term):
        yield line
        
result = map(lambda line: startsWith(line, keyword),lines)
for k in result:
    print(list(k))

[]
['import org.apache.beam.sdk.Pipeline;']
['import org.apache.beam.sdk.io.TextIO;']
['import org.apache.beam.sdk.options.PipelineOptions;']
['import org.apache.beam.sdk.options.PipelineOptionsFactory;']
['import org.apache.beam.sdk.transforms.DoFn;']
['import org.apache.beam.sdk.transforms.ParDo;']


In [8]:
# import pandas as pd
# d = {'a':[1,2,3,4],
#      'b':[5,6,7,8],
#      'c':[9,10,11,12]}
# df = pd.DataFrame(d)
# print(df.a)
# print(df['a'])
# df

In [9]:
# a,b,c = (1,2,3)

#### 문자열 처리 함수 분석

In [10]:
def splitPackageName(packageName):
   """e.g. given com.example.appname.library.widgetname
           returns com
	           com.example
                   com.example.appname
      etc.
   """
   result = []
   end = packageName.find('.')
   while end > 0:
      result.append(packageName[0:end])
      end = packageName.find('.', end+1)
   result.append(packageName)
   return result

str1 = 'com.example.appname.library.widgetname'
splitPackageName(str1)

['com',
 'com.example',
 'com.example.appname',
 'com.example.appname.library',
 'com.example.appname.library.widgetname']

In [11]:
def getPackages(line, keyword):
   start = line.find(keyword) + len(keyword)
   end = line.find(';', start)
   if start < end:
      packageName = line[start:end].strip()
      return splitPackageName(packageName)
   return []

getPackages(lines[1],keyword)

['org',
 'org.apache',
 'org.apache.beam',
 'org.apache.beam.sdk',
 'org.apache.beam.sdk.Pipeline']

In [12]:
def packageUse(line, keyword):
   packages = getPackages(line, keyword)
   for p in packages:
      yield (p, 1)
output = packageUse(lines[1], keyword)    
for k in output:
    print(list(k))

['org', 1]
['org.apache', 1]
['org.apache.beam', 1]
['org.apache.beam.sdk', 1]
['org.apache.beam.sdk.Pipeline', 1]
