# Title: Nurse connect Gates Investigation
# Doc # : Gates-1000-001 Upload Table of NC mobile users
# Author: Charles Copley
# Date : 27 June 2017
# Revision: A

In [1]:
#import libraries
import pandas as pd
import sqlalchemy 
import numpy as np
from bokeh.io import output_file, show
from bokeh.models import GeoJSONDataSource
from bokeh.plotting import figure
from bokeh.sampledata.sample_geojson import geojson
import psycopg2
import json
from bokeh.events import ButtonClick
from bokeh.models import Button
from random import random
from bokeh.layouts import column
from bokeh.models import Button
from bokeh.palettes import RdYlBu3
from bokeh.plotting import figure, curdoc
import os
from IPython.core.display import display, HTML
%matplotlib inline



In [465]:
#define the class for sampling
class SelectSample:
    #class to hold all the peripheral things required to sample the main DB
    #TBD 
    #--1. Unit Tests.
    #--2. Add capacity for joining new (as yet unseen) tables and information.
    #--3. Check the JSON file against the parent table name for column existence.
    #--4. Add message set query and message sequence number
    
    def __init__(self,config_json_file):
        #test configuration variables
        self.config_json_file = config_json_file
        self.read_json_config()
        
        self.master_db_name = self.data['master_db_name']#this will be added as part of the
        self.database_port = self.data['database_port']
        self.database_server = self.data['database_server']
        self.user = self.data['user']
        self.pwd = self.data['pwd']
        #configuration script
        self.id_column = self.data['id_column'] # column that is used for the unique identifier
        self.investigation_id = int(self.data['investigation_id']) # Investigation number tag
        self.wave_id = int(self.data['wave_id']) # Investigation number tag
        self.parent_table_name = self.data['parent_table_name']
        self.filtered_table_name = self.data['filtered_table_name']
        self.all_samples_table_name = self.data['all_samples_table_name']
        #variable to hold whether the parent table has duplication or not.
        self.duplicates = True
        
        print('master_db_name: ',self.master_db_name)
        print('database_port: ',self.database_port)
        print('database_server: ',self.database_server)
        print('database_user: ',self.user)
        print('database_pwd: ',self.pwd)
        print('unique_field: ',self.id_column)
        print('investigation_id: ',self.investigation_id)
        print('all_samples_table: ',self.all_samples_table_name)
        print('population_filter: ',self.data['filter'])
        print('field_to_split: ', self.data['field_to_randomize'])
        print('values_to_use: ', list(self.data['samples_from_field'].keys()))
        print('number_to_select: ', list(self.data['samples_from_field'].values()))
         #name of the table that keeps all the previous samples
        #definition of the filter to be applied to the entire population
        filterJsonVals = self.data['filter']
        nFilters = len(filterJsonVals)
        self.population_filter = filterJsonVals[0]
        for i in range(1,nFilters):
            print(filterJsonVals[i])
            self.population_filter = self.population_filter + """ and """ + filterJsonVals[i]
            
        self.group_filter = self.data['field_to_randomize']
        self.group_values = list(self.data['samples_from_field'].keys())
        self.group_sample_number = list(self.data['samples_from_field'].values())
        #self.group_values = ['afr_ZA','eng_ZA','xho_ZA']
        #self.group_sample_number = [1000,200,250]
        self.temp_full_sample_table_name = "temp_wave_%d"%(int(self.wave_id)) 
    
    def check_duplicates(self):
        count_id_column = pd.read_sql("""
            select count(%s) 
            from %s 
            """%(self.id_column,self.parent_table_name),self.conn_db)
        count_distinct_id_column = pd.read_sql("""
            select count(distinct(%s)) 
            from %s 
            """%(self.id_column,self.parent_table_name),self.conn_db)
        if(count_id_column['count'].item() !=count_distinct_id_column['count'].item()):
            print('ID Column "%s" is not unique. \n Count %d Count distinct %d'%(self.id_column,count_id_column['count'],count_distinct_id_column['count']))
            self.duplicates = True
        elif(count_id_column['count'].item() == count_distinct_id_column['count'].item()):
            print('ID Column "%s" is unique. Yay! \n Count %d Count distinct %d'%(self.id_column,count_id_column['count'],count_distinct_id_column['count']))
            self.duplicates = False
        return(count_id_column,count_distinct_id_column)
    
    def read_json_config(self):
        with open(self.config_json_file) as data_file:
            self.data = json.load(data_file)
        return(self.data)
   
    def connect_to_db(self):
        self.conn_db=psycopg2.connect(dbname=self.master_db_name,user=self.user,password=self.pwd,
                port=self.database_port, host=self.database_server)
    
    def disconnect_from_db(self):
        self.conn_db.close()

    def read_test_data(self,file_to_read):
        test_data = pd.read_csv(file_to_read)
        return test_data
    
    def create_facility_augmented_database(self):
        self.parent_table_facility_code = "facility_code"
        self.clinic_table_facility_code = "facilitycode"
        self.clinic_facility_table = "clinic_facilities_with_gps"
        self.temp_augmented_table = "temp_table_for_augmented_facilities"
        conn = self.conn_db.cursor() #create cursor to execute the direct db commands over psycopg2

        sql_execution = """
                    CREATE TEMP TABLE %s as
                    (select * from %s as a
                    left join %s as b
                    on a.%s::integer = b.%s::integer);
                    """%(self.temp_augmented_table,self.parent_table_name,self.data["clinic_facility_table"], 
                        self.data["parent_table_facility_code"], self.data["clinic_table_facility_code"])
        conn.execute(sql_execution)
        conn.close()
    
    def create_filtered_database(self):
        conn = self.conn_db.cursor() #create cursor to execute the direct db commands over psycopg2
        sql_execution = """DROP TABLE IF EXISTS %s;"""%(self.filtered_table_name)
        conn.execute(sql_execution)
        sql_execution = """
        CREATE TEMP TABLE %s as (
                select *,
                %s as investigation_id, 
                %s as wave_id,
                '%s' as sample_config,
                '%s' as master_db_name,
                '%s' as parent_table_name
                from %s 
                where %s 
                );
        """%(self.filtered_table_name,
             self.investigation_id,
             self.wave_id,
             self.config_json_file,
             self.master_db_name,
             self.parent_table_name,
             self.parent_table_name,
             self.population_filter)
        conn.execute(sql_execution)
        conn.close()
        
    def create_full_sample_temp_table(self):
        conn = self.conn_db.cursor() #create cursor to execute the direct db commands
        temp_table_name = "group_0"
        sql_execution = """DROP TABLE IF EXISTS %s;"""%(self.temp_full_sample_table_name)
        conn.execute(sql_execution)
        sql_execution = """CREATE TEMP TABLE  %s (LIKE %s);"""%(self.temp_full_sample_table_name,temp_table_name)
        conn.execute(sql_execution)
        conn.close()
    
    #check if the table is required for the gates samples   
    def check_exists_investigations_gates_table(self):
        conn = self.conn_db.cursor() #create cursor to execute the direct db commands
        sql_query = """SELECT EXISTS 
        (SELECT 1 FROM   pg_tables 
            WHERE  schemaname = 'public' AND tablename = '%s');"""%(self.all_samples_table_name)
        conn.execute(sql_query)
        self.exists = conn.fetchall()[0][0]
        conn.close()
        return self.exists

    def create_all_investigations_gates_table_if_not_exist(self):
        """create the table to hold all the gates investigations if required"""  
        #create cursor to execute the direct db commands
        if (not self.check_exists_investigations_gates_table()):
            conn = self.conn_db.cursor()
            sql_execution = """CREATE TABLE %s as (select * from %s where 1=2) ;"""%(self.all_samples_table_name, self.temp_full_sample_table_name)
            conn.execute(sql_execution)
            self.conn_db.commit()
            conn.close()

    def append_group_sample(self,group_id):
        conn = self.conn_db.cursor() #create cursor to execute the direct db commands
        temp_full_sample_table_name = "temp_wave_%d"%(self.wave_id)
        temp_table_name = "group_"+str(group_id)
        sql_execution = """insert into %s select * from %s;"""%(self.temp_full_sample_table_name,temp_table_name)
        result = conn.execute(sql_execution)
        self.conn_db.commit()
        conn.close()
        return(result)
    
    def append_sample_to_all_investigation(self):
        conn = self.conn_db.cursor() #create cursor to execute the direct db commands
        sql_execution = """insert into %s select * from %s;"""%(self.all_samples_table_name,self.temp_full_sample_table_name)
        result = conn.execute(sql_execution)
        self.conn_db.commit()
        conn.close()
        return(result)
    
    def sample_groups(self):
        for group_id in range(0,len(self.group_values)):
            (a,b,df) = self.get_group_sample(group_id,
                                             self.investigation_id,
                                             self.wave_id,
                                             self.filtered_table_name,
                                             self.population_filter,
                                             self.group_filter,
                                             self.group_values[group_id],
                                             int(self.group_sample_number[group_id]))
            print('group_id: ',group_id,self.group_values[group_id])
        return(a,b,df)
    
    def append_groups(self):
        for i in range(0,len(self.group_values)):
            self.append_group_sample(i)
    
    def get_group_sample(self,group_id,investigation_id, wave_id,parent_table_name,
                         population_filter,group_filter,group_value,samples):
        #function to get the random samples from a given group
        conn = self.conn_db.cursor() #create cursor to execute the direct db commands over psycopg2
        temp_table_name = "group_"+str(group_id)
        sql_execution = """DROP TABLE IF EXISTS %s;"""%(temp_table_name)
        conn.execute(sql_execution)
        sql_execution = """
        CREATE TEMP TABLE %s as (
                select *,
                    random() as random,
                    %d as group_id
                from %s 
                where %s 
                and %s in ('%s') 
                order by random asc 
                limit %d);
        """%(temp_table_name,group_id,
             parent_table_name,population_filter,
             group_filter,group_value,
             samples)
        conn.execute(sql_execution)
        sql_execution_count = """
                select count(*)
                from %s
                where %s 
                and %s in ('%s');
                """%(parent_table_name,population_filter,group_filter,group_value)
        count = conn.execute(sql_execution_count)
        sample_count = pd.read_sql("""
                select count(*) 
                from %s;
                """%(temp_table_name),self.conn_db)
        full_count = pd.read_sql("""
                select count(*) 
                from %s 
                where %s 
                and %s in ('%s')
                """%(parent_table_name,population_filter,group_filter,group_value),self.conn_db)
        sample = pd.read_sql("""
                select * from %s;
                """%(temp_table_name),self.conn_db)
        conn.close()
        if sample_count['count'][0] < samples:
            # Create a new instance of an exception
            print('error')
            sample_error = ValueError("The requested sample from %s is too large" % (group_value))
            raise sample_error
        return(sample_count, full_count, sample)

    def get_final_wave_sample(self):
        """Return the samples that will be added for the final wave"""
        full_sample = pd.read_sql("""
            select * 
            from %s 
            """%(self.temp_full_sample_table_name),self.conn_db)
        return full_sample
    
    def get_augmented_sample(self):
        """Return the samples that have had clinic locations augmented using the facility code"""
        full_sample = pd.read_sql("""
            select * 
            from temp_table_for_augmented_facilities
            """,self.conn_db)
        return full_sample
    
    def get_all_investigations_sample(self):
        """Return the samples already stored in the Gates investigation table"""
        all_investigations_sample = pd.read_sql("""
            select * 
            from %s 
            """%(self.all_samples_table_name),self.conn_db)
        return all_investigations_sample
    
    
    def json_config_fields(self):
        """Return the configuration fields used in the JSON configuration file"""
        return(self.group_filter,self.group_values)
    
    def plot_histogram(self):
        self.get_final_wave_sample().groupby(self.data['field_to_randomize']).count()[self.data['id_column']].plot('bar')
    
    def return_pandas_sql_query(self,sql):
        #function to return a pandas dataframe of the temporary tables created above:
        #Typically these will be:
        #1. parent_table_name
        #2. gates_investigation_%d_filtered
        #3. all_gates_investigation_samples
        #4. temp_wave_%d
        #5. group_%d
        return(pd.read_sql(sql,self.conn_db))
    
    def clinic_locations_of_participants(self):
#        sql_execution = """
#                select a.*,b.lat,b.lon from
#                temp_wave_1 as a
#                join
#                clinic_facilities_with_gps as b
#                on a.%s::integer = b.%s::integer
#                ;
#        """%(self.parent_table_facility_code,self.clinic_table_facility_code)
        sql_execution = """
                select * from
                temp_wave_1
                ;
        """
        return(pd.read_sql(sql_execution,self.conn_db))
    
    def generate_folium_map(self):
        import folium
        locations = self.clinic_locations_of_participants().dropna(subset=['lon','lat'])  
        m = folium.Map(location=[-30.92, 24.42],zoom_start=5,width=800, height=480)  
        marker_cluster = folium.MarkerCluster().add_to(m)
        for each in zip(locations.lat, locations.lon):
            folium.Marker(each).add_to(marker_cluster)
        m.save('base_map.html')
        return(m)
    
    def return_conn_db(self):
        return self.conn_db

In [1]:
import pandas as pd

In [2]:
mobi_users = pd.read_csv('NurseConnect_mobi_users.csv')

In [8]:
mobi_users.columns = ['mobi_msisdn','facility_code']

In [9]:
mobi_users

Unnamed: 0,mobi_msisdn,facility_code
0,27849862597,
1,27849572147,
2,27849278227,
3,27849221829,
4,27848882750,236733.0
5,27848727769,
6,27848205119,
7,27847257566,
8,27847224516,474051.0
9,27847017995,147360.0


In [10]:
from sqlalchemy import create_engine
engine = create_engine('postgresql://charlescopley:parham@localhost:6432/charlescopley')
mobi_users.to_sql('nurse_connect_mobi_users', engine)

In [12]:
engine.dispose()
