In [None]:
#imports
import json

In [None]:
#function definitions

def tag_json(json_string):
    """
    if row can be decoded, tag as good
    otherwise, tag as bad. Also log the error type, exception given, and json string
    """
    try:
        json.loads(json_string)
        return json.dumps({"good": json_string})
    except Exception as e:
        return json.dumps({"bad": "['%s'',''%s'',''%s]" % (type(e), e.args[0], json_string)})
    
def decimal_to_string(json_string):
    """
    convert (decimal) value of shodan key to string
    """
    json_string = json.loads(json_string)
    try:
        json_string["location"]["longitude"] = str(json_string["location"]["longitude"])
    except:
        json_string["location"]["longitude"] = json_string["location"]["longitude"]
    try:
        json_string["location"]["latitude"] = str(json_string["location"]["latitude"])
    except:
        json_string["location"]["latitude"] = json_string["location"]["latitude"]
    try:
        json_string["port"] = str(json_string["port"])
    except:
        json_string["port"] = json_string["port"]
    try:
        json_string["ip"] = str(json_string["ip"])
    except:
        json_string["ip"] = json_string["ip"]
    try:
        json_string["ssl"]["cert"]["serial"] = str(json_string["ssl"]["cert"]["serial"])
    except:
        pass
    json_string = json.dumps(json_string)
    return json_string

In [None]:
#load shodan files from HDFS
jsn = sc.textFile("/data/staging/shodan/raw_uncompressed", 10000).cache()

In [None]:
#check output after tagging record as "good" or "bad"
tagged = jsn.map(lambda x: tag_json(x))
# print tagged.take(1)[0]

In [None]:
#split tagged RDD into 2 RDDs: shodan ("good" and valid records) and errors ("bad" records)
errors = tagged.filter(lambda x: json.loads(x).keys()[0] == "bad").map(lambda x: json.loads(x)["bad"]).cache()
shodan = tagged.filter(lambda x: json.loads(x).keys()[0] == "good").map(lambda x: json.loads(x)["good"]).map(lambda x: decimal_to_string(x)).cache()

In [None]:
#convert good RDD to dataframe
shodanJSON = sqlContext.jsonRDD(shodan)

In [None]:
#save as gz.parquet file
shodanJSON.write.parquet("/user/svillanueva/spark_jsonRDD_pq_shodan_201501_FULL")

In [None]:
#save bad RDD (errors) to file
errors.saveAsTextFile("spark_shodan_errors")

In [None]:
#check shodan columns
print shodanJSON.columns
print
shodanJSON.printSchema()

In [None]:
#create hive table snappy parquet and insert data from gz.parquet
#create external table works but insert into snappy parquet doesn't work yet because of compatibility issues with Spark parquet and Hive parquet
with open("shodan.hql", "wb") as outfile:
    outfile.write("use shodan;\nSET hive.exec.dynamic.partition.mode=nonstrict;\n" \
                  "--increase map and reduce memory to handle java heap size error\n" \
                  "SET mapreduce.map.memory.mb=15000;\n" \
                 "SET mapreduce.reduce.memory.mb=5000;\n\n" \
                 "--create external staging table\n" \
                  "CREATE EXTERNAL TABLE shodan.pq_shodan_staging_full\n(\n")
    for col in shodanJSON.columns[:-1]:
        outfile.write(col + " string,\n")
    outfile.write(shodanJSON.columns[-1] + " string\n)\nSTORED AS PARQUET\n" \
                  "LOCATION '/user/svillanueva/spark_jsonRDD_pq_shodan_201501_FULL';\n\n" \
                 "--create empty pq full table\n" \
                  "CREATE TABLE shodan.pq_shodan_201501_full\n(\n")
    for col in shodanJSON.columns[:-1]:
        outfile.write(col + " string,\n")
    outfile.write(shodanJSON.columns[-1] + " string\n)\nPARTITIONED BY (year_ts INT, month_ts INT, day_ts INT)\n" \
                  "STORED AS PARQUET\nTBLPROPERTIES ('parquet.compression'='SNAPPY');\n\n" \
                 "--insert data into pq_shodan_201501_full\n" \
                  "INSERT INTO shodan.pq_shodan_201501_full\n" \
                 "PARTITION(year_ts, month_ts, day_ts)\nSELECT ")
    for col in shodanJSON.columns:
        outfile.write(col + ",\n")
    outfile.write("YEAR(to_date(timestamp)),\nMONTH(to_date(timestamp)),\nDAY(to_date(timestamp))\n" \
                 "FROM shodan.pq_shodan_staging_full\n;") 