Skip to content

Commit

Permalink
added other clutering algos
Browse files Browse the repository at this point in the history
  • Loading branch information
marty90 committed Nov 28, 2017
1 parent 735df6b commit a60494d
Showing 1 changed file with 3 additions and 44 deletions.
47 changes: 3 additions & 44 deletions run_job.py
Expand Up @@ -9,12 +9,10 @@
import json
import os
import zipfile

import core.utils

def main():



# Configure argparse
parser = argparse.ArgumentParser(description='NetLytics Job')

Expand Down Expand Up @@ -75,33 +73,8 @@ def main():
sc = SparkContext(conf = conf)
spark = SparkSession(sc)

# Ship code to executors
ship_dir (base_path + "/algos",sc, base_path)
ship_dir (base_path + "/core",sc, base_path)
ship_dir (base_path + "/connectors",sc, base_path)


# Find connector
connector_module = my_import(connector,sc)

# Parse dates
y1,m1,d1 = start_day.split("_")
date1 = date (int(y1),int(m1),int(d1))
y2,m2,d2 = end_day.split("_")
date2 = date (int(y2),int(m2),int(d2))

# Instantiate connector
connector_instance = connector_module(input_path,date1,date2 )

# Get and enforce Schema
output_type = connector_instance.output_type
schema_file = base_path + "/schema/" + output_type + ".json"
schema_json = json.load(open(schema_file,"r"))
schema = StructType.fromJson(schema_json)
connector_instance.set_schema (schema)

# Get Dataset
dataset = connector_instance.get_DF(sc,spark)
# Create the dataframe
dataset = core.utils.get_dataset(sc,spark,base_path,connector,input_path,start_day, end_day )

# Create Algo
algo_module = my_import(algo,sc)
Expand Down Expand Up @@ -129,20 +102,6 @@ def my_import(name,sc):
my_class = getattr(module, labels[-1])
return my_class

n=0
def ship_dir(path,sc, base_path):
global n
n+=1
zipf = zipfile.ZipFile('/tmp/' + str(n)+ '.zip', 'w', zipfile.ZIP_DEFLATED)
zipdir(path + '/', zipf, base_path)
zipf.close()
sc.addPyFile('/tmp/' + str(n)+ '.zip')

def zipdir(path, ziph, base_path):
# ziph is zipfile handle
for root, dirs, files in os.walk(path):
for file in files:
ziph.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file),base_path ) )

main()

Expand Down

0 comments on commit a60494d

Please sign in to comment.