In [None]:
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import SparkSession
import pyspark
import os

In [None]:
import json
from datetime import datetime

In [None]:
conf = SparkConf() \
    .setAppName("hackaton") \
    .setMaster('spark://localhost:7077') \
    .set("spark.sql.catalogImplementation","hive") \
    .set("spark.jars", "/home/juliotorres/hackaton/postgresql-42.2.10.jar")

In [None]:
sc = SparkContext(conf=conf)
hiveContext = HiveContext(sc)

In [None]:
sqlContext = SQLContext(sc)

In [None]:
spark = SparkSession.builder \
    .config("spark.sql.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse") \
    .config("spark.sql.uris", "thrift://localhost:10000") \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
spark.sql("show tables").show()

In [None]:
data_path = "/user/juliotorres"

In [None]:
history = hiveContext.read.parquet(data_path+'/history')

In [None]:
candidates = hiveContext.read.parquet(data_path+'/candidates')

In [None]:
hiveContext.registerDataFrameAsTable(history, 'history')

In [None]:
hiveContext.sql("""
    select age
      from history
     where age > 70
  order by age asc
""").collect()

```python
hiveContext.registerDataFrameAsTable(stages, 'stages')
hiveContext.registerDataFrameAsTable(vacants, 'vacants')
hiveContext.registerDataFrameAsTable(candidates, 'candidates')
hiveContext.registerDataFrameAsTable(applications, 'applications')
hiveContext.registerDataFrameAsTable(applicationStages, 'applicationStages')
```

In [None]:
hiveContext.registerDataFrameAsTable(history, 'history')

In [None]:
hiveContext.sql("show tables").show()

In [None]:
#new_df = hiveContext.read.json(history.rdd.map(lambda r: r.candidate_educational_institution))

def parse(r):
    educational_institution = json.loads(r.candidate_educational_institution)
    institute_complete  = [ 
        x['institute'] or ''
        for x in educational_institution
        if x['in_progress'] == False
    ]
    institute_incomplete  = [ 
        x['institute'] or ''
        for x in educational_institution
        if x['in_progress'] == True
    ] 
    title_complete = [
        x['title'] or ''
        for x in educational_institution
        if x['in_progress'] == False
    ]
    title_incomplete = [
        x['title'] or ''
        for x in educational_institution
        if x['in_progress'] == True
    ]
    study_type_complete = [ 
        x['study_type'] or ''
        for x in educational_institution
        if x['in_progress'] == False
    ]
    study_type_incomplete = [ 
        x['study_type'] or ''
        for x in educational_institution
        if x['in_progress'] == True
    ]
    
    return (
        r.candidate_id, 
        ' '.join(institute_complete), 
        ' '.join(institute_incomplete),
        ' '.join(title_complete),
        ' '.join(title_incomplete),
        ' '.join(study_type_complete),
        ' '.join(study_type_incomplete),
    )

df_institute = history \
    .where("candidate_educational_institution <> '[]'") \
    .limit(10) \
    .rdd \
    .map(parse)

df_institute = df_institute.toDF([ 
    'candidate_id', 
    'candidate_educational_institute_complete', 
    'candidate_educational_institute_incomplete',
    'candidate_educational_title_complete', 
    'candidate_educational_title_incomplete',
    'candidate_educational_study_type_complete',
    'candidate_educational_study_type_incomplete',
])

df_institute.select('candidate_educational_study_type_complete').show()

In [None]:
history.join(df_institute, "candidate_id").show(2)

In [None]:
hiveContext.sql("""
    select count(*) conteo
      from history a
""").show()

In [None]:
candidate_salary_m = hiveContext.sql("""
select percentile_approx(candidate_salary, 0.5) candidate_salary
  from history
 where candidate_salary is not null
   and candidate_title_of_profetion = true
 """).collect()[0].candidate_salary

candidate_salary_m

In [None]:
tmp = history \
    .where("candidate_educational_institution <> '[]' and candidate_educational_institution is not null") \
    .limit(1) \
    .selectExpr(
        """candidate_email      <> '' and candidate_email is not null as candidate_has_email""",
        """candidate_first_name <> '' and candidate_first_name is not null as candidate_has_first_name""",
        """candidate_last_name  <> '' and candidate_last_name is not null as candidate_has_last_name""",
        """candidate_phone      <> '' and candidate_phone is not null as candidate_has_phone""",

        """CAST(datediff(
            current_date(), 
            TO_DATE(CAST(UNIX_TIMESTAMP(candidate_birthdate,'yyyy-MM-dd') AS TIMESTAMP))
           )/365 as integer) as age""",

        """case when candidate_gender = 'male' then 'm'
                when candidate_gender = 'female' then 'f' 
           else 'u' end as candidate_gender""",

        """case when candidate_identification_type is null then 100 
           else candidate_identification_type 
           end candidate_identification_type""",

        """candidate_identification_number <> '' and 
           candidate_identification_number is not null 
           as candidate_has_identification_number""",

        """case when candidate_city <> '' and candidate_city is not null then candidate_city 
           else 'unknow' end as candidate_city""",

        """case when candidate_education_level <> '' and candidate_education_level is not null then candidate_education_level 
           else 'unknow' end as candidate_education_level""",

        """case when candidate_salary is null then %s 
           else candidate_salary end as candidate_salary""" % candidate_salary_m,

        """case when candidate_profile_description is null then ''
           else candidate_profile_description end as candidate_profile_description""",

        """case when candidate_withow_experience is null then false 
           else candidate_withow_experience = 1 end candidate_withow_experience""",

        """case when candidate_withow_studies is null then false 
           else candidate_withow_studies = 1 end candidate_withow_studies""",

        """case when candidate_sectors is null then 'oficios varios'
           else candidate_sectors end as candidate_sectors""",

        """case when candidate_title_of_profetion is null then false 
           else candidate_title_of_profetion = 1 end candidate_title_of_profetion""",

        """case when candidate_civil_status is null then 'unknow'
           else candidate_civil_status end candidate_civil_status""",

        """candidate_presentation is not null candidate_has_presentation""",
        # jalar metadata

        """candidate_educational_institution""",
    
    ) 

tmp

In [None]:
datetime.strptime('2018-01-01', '%Y-%m-%d')

In [None]:
None or 'ss'

In [None]:
tmp

In [None]:
history \
    .where("candidate_educational_institution <> '[]'") \
    .select('candidate_educational_institution') \
    .limit(3).collect()

In [None]:
tmp[0].candidate_educational_institution

In [None]:
candidate_salary_m = hiveContext.sql("""
select count(1)
  from history 
 where candidate_civil_status is not null
 limit 10
 """).show()

In [None]:
history \
    .select("candidate_sectors") \
    .where("candidate_sectors is not null") \
    .where("candidate_sectors like '%Tecnica%'") \
    .distinct() \
    .collect()[:10]

In [None]:
candidates.select("title_of_profetion").distinct().limit(10).collect()

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv("./data/Candidates.csv",header=None)

In [None]:
df.iloc[:,17].unique()

In [None]:
hiveContext.sql("""
    select *
      from history a
     where candidate_email is not null
     limit 100
""").columns

In [None]:
hiveContext.sql("""
    select *
      from history a
     limit 10
""").toPandas()