In [None]:
!pip install --quiet accelerate peft bitsandbytes transformers datasets

In [2]:
import torch
import sqlite3
from peft import LoraConfig, AutoPeftModelForCausalLM, PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from datasets import load_dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
dataset_id = "NESPED-GEN/spider_selector_schemaReduzido"
dataset_split = "dev"
dataset =  load_dataset(dataset_id,split=dataset_split)
df = dataset.to_pandas()

### Modelo que será Testado

In [7]:
def get_model_and_tokenizer(model_id):

    tokenizer = AutoTokenizer.from_pretrained(model_id)
    tokenizer.pad_token = tokenizer.eos_token

    compute_dtype = getattr(torch, "float16")
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=compute_dtype,
        bnb_4bit_use_double_quant=False,
    )
    model = AutoModelForCausalLM.from_pretrained(
        model_id,
        trust_remote_code=True,
        torch_dtype=torch.float16,
        device_map="auto",
        use_cache=True,
    )
    return model, tokenizer

In [8]:
from huggingface_hub import login

token = "HUGGING_FACE_WRITE_TOKEN"
login(token=token)

In [None]:
model_name = ""
model, tokenizer = get_model_and_tokenizer(model_name)

### Prompt e Função para Obter Resposta Gerada pelo Modelo **[ajustar de acordo com o modelo]**

In [13]:
from transformers import pipeline

params = {
    "task":"text-generation",
    "eos_token_id":tokenizer.eos_token_id,
    "pad_token_id":tokenizer.eos_token_id,
    "max_new_tokens":250,
    "do_sample":False,
    "temperature": 0.0,
    "return_full_text":False,
    "stop_sequence": "<|im_end|>"
}

pipe = pipeline(model=model, tokenizer=tokenizer, **params)

Device set to use cuda:0


In [None]:
def SQLDatabase(db_path, num_examples = 0):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    schema_str = ""

    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()

    for table in tables:

        table_name=table[0]
        cursor.execute(f"PRAGMA table_info('{table_name}')")
        included_columns = cursor.fetchall()

        schema_str += f'CREATE TABLE {table_name.lower()} (\n'

        primary_keys = []
        for column in included_columns:
            column_name = column[1].replace('"','')
            column_type = column[2]
            schema_str += f'        {column_name.lower()} {column_type.upper()},\n'

            if column[5] == 1:
                primary_keys.append(column[1].replace('"',''))

        schema_str = schema_str.rstrip(",\n") 

        # Adicionar chaves primárias ao esquema
        if primary_keys:
            primary_keys_str = [pk.replace('"','').lower() for pk in primary_keys]
            primary_keys_str = ", ".join(primary_keys_str)
            schema_str += f',\n        PRIMARY KEY ({primary_keys_str})'


        cursor.execute(f"PRAGMA foreign_key_list('{table_name}')")
        foreign_keys_info = cursor.fetchall()
        for fk in foreign_keys_info:
          try:
              fk_col = fk[3].replace('"','')          
              ref_table = fk[2].replace('"','')       
              ref_col = fk[4].replace('"','')         
              schema_str += f',\n        FOREIGN KEY ({fk_col.lower()}) REFERENCES {ref_table.lower()}({ref_col.lower()})'
          except:
            print(fk)

        schema_str += "\n);\n\n"

        if num_examples > 0:
          cursor.execute(f"SELECT {', '.join([col[1] for col in included_columns])} FROM {table_name} LIMIT {num_examples};")
          rows = cursor.fetchall()
          schema_str += f"/*\n{len(rows)} rows from {table_name} table:\n"
          schema_str += "\t".join([col[1].lower().replace('"','') for col in included_columns]) + "\n"
          for row in rows:
              schema_str += "\t".join(map(str, row)) + "\n"
          schema_str += "*/\n\n"

    schema_str = schema_str.rstrip('\n\n')

    conn.close()
    return schema_str

In [15]:
def generate_response(question, schema):

    system = "Given a user question and the schema of a database, your task is to generate an JSON with the names of tables and columns of the schema that the question is referring to."

    messages = [
              {'role': 'system', 'content': system},
              {'role': 'user', 'content': f"# Schema:\n```sql\n{schema}\n```\n\n# Question: {question}"}
    ]

    prompt = pipe.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    prompt += "```json"
    resp = pipe(prompt)

    generated_text = resp[0]["generated_text"]

    tokens = pipe.tokenizer.encode(generated_text)


    return generated_text, len(tokens)

### Test and save results

In [None]:
database_path = ".../"
out_path = f"....txt"
out_path

In [None]:
import time
from tqdm import tqdm
import statistics

result = []
response_time = []

with open(out_path, 'a+') as file:
  for example in tqdm(dataset, desc="Test ..."):
    
    db_id = example['db_id']
    schema = SQLDatabase(f'{database_path}{db_id}/{db_id}.sqlite')
     
    question =  example['question_en']   

    #generated
    start_time = time.time()  
    output, tokens_count = generate_response(question, schema)
    end_time = time.time()  

    response_time.append(end_time - start_time)

    try:
      output = output.split('```json')[1].split('```')[0]
    except IndexError:
      output = output.split('```')[0]

    try:
      output = eval(output)
    except:
      print(output)
      output = {'erro':['']}

    result.append(output)

    file.write(f"{output}\n")
    file.flush()

print(f'\naverage time = {statistics.mean(response_time)}')