In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

# Request 250 cores on L41 cluster
#conf = SparkConf().set("spark.cores.max", 250)
#sc = SparkContext(conf=conf)

sc = SparkContext()
sqlCtx = SQLContext(sc)

In [2]:
GZIP_CODEC = "org.apache.hadoop.io.compress.GzipCodec"

In [3]:
import subprocess

def list_hdfs_dir(directory):
    result = subprocess.check_output("hadoop fs -ls %s | sed '1d;s/  */ /g' | cut -d\  -f8" % directory, shell=True)
    result = [x for x in str(result).strip("b").strip("'").split("\\n") if x]
    return result

#original_data_folder = "hdfs://namenode/datasets/github/json/"
#original_data_paths = list_hdfs_dir(original_data_folder)

In [4]:
import json
def convert_json(x):
    return json.loads(x)

def to_json_string(x):
    return json.dumps(x)

In [5]:
def is_valid_syntax(x):
    try:
        # compile with throw syntax error on syntax errors
        compile(x["content"], "script_string", "exec")
        return True
    except:
        return False
    
def is_invalid_syntax(x):
    return not is_valid_syntax(x)

In [6]:
def is_not_utf8(x):
    return len(x["content"]) == len(x["content"].encode())
def is_utf8(x):
    return not is_not_utf8(x)

In [7]:
original_data_path = "hdfs://namenode/datasets/github/json/*"
#original_data_path = "hdfs://namenode/datasets/github/json/contents000000000000.json.gz"
#original_data_path = "hdfs://namenode/datasets/github/json/contents000000000003.json.gz" #11676

In [5]:
original_data_path = "hdfs://namenode/datasets/github/processed/03_py3/success/part-00000"

In [6]:
j_rdd = sc.textFile(original_data_path)

In [8]:
j_rdd_local = j_rdd.collect()

In [None]:
!head /local_data/altair/sample_data.json



In [13]:
import json
with open("/local_data/altair/sample_data.json", "w") as f:
    for j in j_rdd_local:
        f.write(j)

In [15]:
with open("/local_data/altair/sample_data.json", "r") as f:
    lines = f.readlines()

In [9]:
j_rdd = j_rdd.repartition(5000)

In [10]:
j_rdd = j_rdd.map(convert_json)

In [11]:
j_rdd.saveAsTextFile("hdfs://namenode/datasets/github/uncompressed/00_json")

---

In [12]:
notutf8_rdd_success = j_rdd.filter(is_not_utf8)
notutf8_rdd_error = j_rdd.filter(is_utf8)

In [13]:
notutf8_rdd_success.saveAsTextFile("hdfs://namenode/datasets/github/uncompressed/01_notutf8/success")

In [14]:
notutf8_rdd_error.saveAsTextFile("hdfs://namenode/datasets/github/uncompressed/01_notutf8/error")

---

In [15]:
syntax_rdd_success = notutf8_rdd_success.filter(is_valid_syntax)
syntax_rdd_error = notutf8_rdd_success.filter(is_invalid_syntax)

In [16]:
syntax_rdd_success.saveAsTextFile("hdfs://namenode/datasets/github/uncompressed/02_syntax/success")

In [17]:
syntax_rdd_error.saveAsTextFile("hdfs://namenode/datasets/github/uncompressed/02_syntax/error")

In [18]:
syntax_rdd_success.count() # should be 4023537

4023537

In [19]:
j_rdd.count() # should be 5267543

5267543

---

In [51]:
from lib2to3.refactor import RefactoringTool, get_fixers_from_package

In [52]:
def convert_python3(x):
    try:
        fixers = get_fixers_from_package('lib2to3.fixes')
        refactoring_tool = RefactoringTool(fixer_names=fixers)
        node3 = refactoring_tool.refactor_string(x["content"], 'script')
        py3_str = str(node3)
        x["content"] = py3_str
        return (True, x)
    except:
        return (False, x)

In [53]:
def is_success(x):
    return x[0] # Key is True if success

In [54]:
py3_rdd = syntax_rdd_success.map(convert_python3)
py3_rdd_success = py3_rdd.filter(is_success)
py3_rdd_success = py3_rdd_success.map(lambda x: x[1])

In [None]:
py3_rdd_success.map(dump_json).saveAsTextFile("hdfs://namenode/datasets/github/uncompressed/03_py3/success")

In [None]:
py3_rdd_error = py3_rdd.map(to_json_string).subtract(py3_rdd_success.map(to_json_string)).map(convert_json)

In [None]:
py3_rdd_error.map(dump_json).saveAsTextFile("hdfs://namenode/datasets/github/uncompressed/03_py3/errors")