In [1]:
import pandas as pd
import numpy as np
from nltk.corpus import stopwords
from nltk.util import ngrams
import nltk
import re

import boto3
from botocore.exceptions import ClientError
from io import StringIO

In [2]:
class Streaming:
    def __init__(self):
        self.message = 'This is init'
        
    def createClient(self, service, region):
        return boto3.client(service, region_name = region)
    
    
    def loadData(self):
        bucket1 = 's3://hiringtrendanalysis/wordcount.csv' # already created on S3
        client = boto3.resource('s3')
        df_data = pd.read_csv(bucket1)
        
        bucket2 = 's3://hiringtrendanalysis/categories.csv' # already created on S3
        client = boto3.resource('s3')
        df_categories = pd.read_csv(bucket2)
    
        
        return df_data, df_categories
    
    def uploadToS3(self,result):
        try:
            bucket = 'hiringtrendanalysis' # already created on S3
            csv_buffer = StringIO()
            result.to_csv(csv_buffer, index = False)
            s3_resource = boto3.resource('s3')
            s3_resource.Object(bucket, 'jobs_categorized.csv').put(Body=csv_buffer.getvalue())
        except ClientError as e:
            logging.error(e)
            return False
        return True
    
    def categorize_jobs(self,wordcount,categories):
        
#         colnames = []
        techskills = [str(row['Core Technical Skills']) for index, row in categories.iterrows() if str(row['Core Technical Skills']) != 'nan']
        comskills = [str(row['Communication Skills']) for index, row in categories.iterrows() if str(row['Communication Skills']) != 'nan']
        netskills = [str(row['Networking Skills']) for index, row in categories.iterrows() if str(row['Networking Skills']) != 'nan']
        
        core_tech_skills = []
        comm_skills = []
        network_skills = []
        
#         for col in wordcount.columns:
#             colnames.append(col)
        
            
        job_categorized = pd.DataFrame()
        
        for index, row in wordcount.iterrows():
            count1 = 0
            count2 = 0
            count3 = 0
            
#             for col in colnames:
#                 if col in techskills:
#                     tmp = row[col]
#                     if tmp > 0:
#                         count1 += 1
#                 else:
#                     continue
                    
            for tsk in techskills:
#                 print(type(row[tsk]))
                if row[tsk] > 0:
                    count1 += 1
                else:
                    continue
            
            core_tech_skills.append(count1)
            
            for com in comskills:
#                 print(com)
                if row[com] > 0:
                    count2 += 1
                else:
                    continue
            
            comm_skills.append(count2)
            
            
            for nt in netskills:
#                 print(type(row[nt]))
                if row[nt] > 0:
                    count3 += 1
                else:
                    continue
            
            network_skills.append(count3)
        
#         print(len(core_tech_skills))
#         print(len(comm_skills))
#         print(len(network_skills))
        
        job_categorized['company'] = wordcount['company']
        job_categorized['location'] = wordcount['location']
        job_categorized['title'] =  wordcount['title']
        job_categorized['job_desc'] =  wordcount['job_desc']
        job_categorized['role'] =  wordcount['role']
        job_categorized['core_tech_skills'] = core_tech_skills
        job_categorized['comm_skills'] = comm_skills
        job_categorized['network_skills'] = network_skills
        
        self.uploadToS3(job_categorized)
            
        return job_categorized
    

    
    def sendKinesis(self,kinesis_client, kinesis_stream_name, kinesis_shard_count, data):
        kinesis_records = []
        
        (rows, columns) = data.shape
        
        currentBytes = 0
        
        rowCount = 0
        
        totalRowCount = rows
        
        sendKinesis = False
        
        shardCount = 1
        
        for index, row in data.iterrows():
            values = '|'.join(str(value) for value in row)
#             print(values)
            
            encodedValues = bytes(values, 'utf-8')
            
            kinesis_record = {"Data": encodedValues, 
                             "PartitionKey" : str(shardCount)}
            
            kinesis_records.append(kinesis_record)
            
            stringBytes = len(values.encode('utf-8'))
            currentBytes = currentBytes + stringBytes 
            
            if len(kinesis_records) == 500:
                sendKinesis = True
                
            if currentBytes > 50000:
                sendKinesis = True
            
            
            if rowCount == totalRowCount:
                sendKinesis = True
                
            if sendKinesis == True:
                
                response = kinesis_client.put_records(Records = kinesis_records,   StreamName = kinesis_stream_name )
            
                kinesis_records = []
                sendKinesis = False
                currentBytes = 0
                
                shardCount = shardCount + 1
                
                if shardCount > kinesis_shard_count:
                    shardCount = 1
                    
            rowCount += 1
            
            # log out how many records were pushed
        print('Total Records sent to Kinesis: {0}'.format(totalRowCount))
       

In [3]:
if __name__ == '__main__':

    
    stream = Streaming()
    
    wordcount, categories = stream.loadData()
    
    categorized_jobs = stream.categorize_jobs(wordcount, categories)
    
#     a.to_csv("test.csv")
    
    
    kinesis_client = stream.createClient('kinesis', 'us-east-1')
    
#     data = stream.loadData()
    
    # send it to kinesis data stream
    stream_name = "hiring_trend_analysis_dstream"
    stream_shard_count = 1
    
    stream.sendKinesis(kinesis_client, stream_name, stream_shard_count, categorized_jobs) # send it!
    
#     d.to_csv('test.csv')
    
#     print(categorized_jobs.head())

Total Records sent to Kinesis: 6227


In [4]:
categorized_jobs.head()

Unnamed: 0,company,location,title,job_desc,role,core_tech_skills,comm_skills,network_skills
0,Defibtech,"Branford, CT",Software Engineer,"['sql', 'erp', 'crossfunctional']",software engineer,2,1,1
1,"Bart & Associates, Inc.","Linthicum, MD",Mainframe Developer,"['aws', 'monitoring', 'aws', 'aws', 'aws', 'co...",software engineer,3,3,4
2,"Assured Information Security, Inc. (AIS)","Chelmsford, MA",Assistant Software Engineer - Cross Domain and...,"['security', 'linux']",software engineer,1,0,1
3,RPS,"South Kingstown, RI",Senior Software Engineer,"['leadership', 'javascript', 'leadership', 'ja...",software engineer,8,1,2
4,"CSAA Insurance Group, a AAA Insurer","Glendale, AZ",Junior Software Engineer,"['python', 'java', 'javascript', 'architecture...",software engineer,7,0,2
