In [1]:
import sys, string
import os
import socket
import time
from time import gmtime, strftime
import operator
import boto3
import json
import findspark
from pyspark.sql import SparkSession
from datetime import datetime


In [2]:
findspark.init()

In [3]:
spark = SparkSession\
        .builder\
        .appName("ethereum-sneha-C")\
        .getOrCreate()

In [4]:
def filter_transaction_line(line):
    try:
        fields = line.split(',')
        if len(fields) != 7 or int(fields[3]) ==0 or fields[0] == 'block_number':
            return False
        else:
            str(fields[2]) # convert to_address to string 
            float(fields[3]) # convert value to float
            int(fields[6]) # convert timestamp to int
            return True
    except:
        return False
        
def filter_scams_line(line):
    try:
        fields = line.split(',')
        if len(fields) != 8 or fields[0] == 'id':
            return False
        else:
            str(fields[4]) # convert scam category to string 
            str(fields[6]) # convert scam address to string 
            return True
    except:
        return False
def mapper_transactions(line):
    try:
        fields = line.split(',')
        to_addr = fields[2]
        wei = int(fields[3])
        raw_timestamp = int(fields[6])
        year_month = time.strftime('%Y-%m', time.gmtime(raw_timestamp))

        key = to_addr
        value = ( year_month, wei, 1 )
        return (key, value)
    except:
        pass

        
    


In [5]:
transactions_file = spark.sparkContext.textFile("./transactions.csv").filter(filter_transaction_line)
# calculate value received at each address in each month
monthly_value_at_address = transactions_file.map(lambda x: ( x.split(',')[2], (strftime('%Y-%m', gmtime(int(x.split(',')[6]))), float(x.split(',')[3]), 1)))
# monthly_value_at_address = transactions_file.map(mapper_transactions)


In [6]:
scams_file = spark.sparkContext.textFile("./scams.csv").filter(filter_scams_line)
# scam_addresses = scams_file.map(lambda x: (x.split(',')[6], x.split(',')[4]))
scam_addresses = scams_file.map(lambda x: (x.split(',')[6], ( x.split(',')[0], x.split(',')[4])))

In [7]:
# Join Transactions and Scams
transactions_scams_joined = monthly_value_at_address.join(scam_addresses)

In [8]:
transactions_scams_joined.collect()

[('0x8fb3a29cad1393852b8a88f6fb9e30ea8ac307cf',
  (('2016-09', 6621260000000000.0, 1),
   ('0x8fb3a29cad1393852b8a88f6fb9e30ea8ac307cf', 'Scamming')))]

In [9]:
# step3 = transactions_scams_joined.map(lambda x: ((x[1][1], x[1][0][0]), (x[1][0][1], x[1][0][2])))
step3 = transactions_scams_joined.map(lambda x: ((x[1][1][0], x[1][1][1], x[1][0][0]), (x[1][0][1], x[1][0][2])))
step4 = step3.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]) )
step5 = step4.map(lambda x: '{},{},{},{},{}'.format(x[0][0], x[0][1], float(x[1][0]/1000000000000000000), x[1][1], float(x[1][0]/1000000000000000000)/x[1][1]))

In [10]:
monthly_value_at_address.collect()[:5]

[('0x8fb3a29cad1393852b8a88f6fb9e30ea8ac307cf',
  ('2016-09', 6621260000000000.0, 1)),
 ('0x5ade579853416fe093874dec1a511271d66f60b3',
  ('2016-09', 2145019333590000.0, 1)),
 ('0xe3a1f697cca3403d3e351821fdbaa33326372d89',
  ('2016-09', 2452260368565000.0, 1)),
 ('0x99e71413683a5d4bff6782caaa1750618a1a361e',
  ('2016-09', 1000000000000000.0, 1)),
 ('0x99e71413683a5d4bff6782caaa1750618a1a361e',
  ('2016-09', 1000000000000000.0, 1))]

In [11]:
scam_addresses.collect()[:5]

[('0x00e01a648ff41346cdeb873182383333d2184dd1',
  ('0x00e01a648ff41346cdeb873182383333d2184dd1', 'Phishing')),
 ('0x858457daa7e087ad74cdeeceab8419079bc2ca03',
  ('0x858457daa7e087ad74cdeeceab8419079bc2ca03', 'Phishing')),
 ('0x4cdc1cba0aeb5539f2e0ba158281e67e0e54a9b1',
  ('0x4cdc1cba0aeb5539f2e0ba158281e67e0e54a9b1', 'Phishing')),
 ('0x11c058c3efbf53939fb6872b09a2b5cf2410a1e2c3f3c867664e43a626d878c0',
  ('0x11c058c3efbf53939fb6872b09a2b5cf2410a1e2c3f3c867664e43a626d878c0',
   'Phishing')),
 ('0x2dfe2e0522cc1f050edcc7a05213bb55bbb36884ec9468fc39eccc013c65b5e4',
  ('0x11c058c3efbf53939fb6872b09a2b5cf2410a1e2c3f3c867664e43a626d878c0',
   'Phishing'))]

In [12]:
transactions_scams_joined.collect()

[('0x8fb3a29cad1393852b8a88f6fb9e30ea8ac307cf',
  (('2016-09', 6621260000000000.0, 1),
   ('0x8fb3a29cad1393852b8a88f6fb9e30ea8ac307cf', 'Scamming')))]

In [13]:
step5.collect()

['0x8fb3a29cad1393852b8a88f6fb9e30ea8ac307cf,Scamming,0.00662126,1,0.00662126']